Skip to content

Commit

Permalink
Updated MQTT Expander
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrickRitchie committed Dec 6, 2023
1 parent 817dbae commit 84abbb5
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 124 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2023 TrakHound Inc., All Rights Reserved.
// TrakHound Inc. licenses this file to you under the MIT license.

using System.Collections.Generic;

namespace MTConnect.Configurations
{
public interface IMTConnectMqttExpanderConfiguration : IMTConnectMqttClientConfiguration
{
IEnumerable<string> Devices { get; set; }

string DocumentFormat { get; set; }

string ExpandedTopicPrefix { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2023 TrakHound Inc., All Rights Reserved.
// TrakHound Inc. licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.Text.Json.Serialization;

namespace MTConnect.Configurations
{
public class MTConnectMqttExpanderConfiguration : MTConnectMqttClientConfiguration, IMTConnectMqttExpanderConfiguration
{
[JsonPropertyName("devices")]
public IEnumerable<string> Devices { get; set; }

[JsonPropertyName("documentFormat")]
public string DocumentFormat { get; set; }

[JsonPropertyName("expandedTopicPrefix")]
public string ExpandedTopicPrefix { get; set; }


public MTConnectMqttExpanderConfiguration()
{
DocumentFormat = MTConnect.DocumentFormat.XML;
ExpandedTopicPrefix = "MTConnect-Expanded";
}
}
}
222 changes: 98 additions & 124 deletions libraries/MTConnect.NET-MQTT/MTConnectMqttExpander.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
using MQTTnet;
using MQTTnet.Client;
using MTConnect.Assets;
using MTConnect.Configurations;
using MTConnect.Devices;
using MTConnect.Formatters;
using MTConnect.Mqtt;
using MTConnect.Observations;
using MTConnect.Streams;
using System;
using System.Collections.Generic;
using System.IO;
Expand All @@ -21,27 +20,10 @@ namespace MTConnect.Clients
{
public class MTConnectMqttExpander : IDisposable
{
private const string _defaultTopicPrefix = "MTConnect";
private const string _defaultDocumentFormat = "json";

private readonly IMTConnectMqttExpanderConfiguration _configuration;
private readonly IMTConnectEntityClient _inputClient;
private readonly MqttFactory _mqttFactory;
private readonly IMqttClient _mqttClient;
private readonly string _server;
private readonly int _port;
private readonly int _qos;
private readonly int _interval;
private readonly string _deviceUuid;
private readonly string _topicPrefix;
private readonly string _documentFormat;
private readonly string _username;
private readonly string _password;
private readonly string _clientId;
private readonly string _caCertPath;
private readonly string _pemClientCertPath;
private readonly string _pemPrivateKeyPath;
private readonly bool _allowUntrustedCertificates;
private readonly bool _useTls;

private CancellationTokenSource _stop;
private MTConnectMqttConnectionStatus _connectionStatus;

Expand All @@ -50,15 +32,17 @@ public class MTConnectMqttExpander : IDisposable
/// </summary>
public int ReconnectionInterval { get; set; }

public string Server => _server;
public string Server => _configuration.Server;

public int Port => _configuration.Port;

public int Port => _port;
public int QoS => _configuration.QoS;

public int QoS => _qos;
public int Interval => _configuration.Interval;

public int Interval => _interval;
public string TopicPrefix => _configuration.TopicPrefix;

public string TopicPrefix => _topicPrefix;
public string ExpandedTopicPrefix => _configuration.ExpandedTopicPrefix;

public MTConnectMqttConnectionStatus ConnectionStatus => _connectionStatus;

Expand Down Expand Up @@ -96,16 +80,20 @@ public class MTConnectMqttExpander : IDisposable
public event EventHandler ClientStopped;


public MTConnectMqttExpander(string server, int port = 1883, string topicPrefix = _defaultTopicPrefix, string documentFormat = _defaultDocumentFormat, string clientId = null, int qos = 1)
public MTConnectMqttExpander(IMTConnectMqttExpanderConfiguration configuration, IMTConnectEntityClient inputClient)
{
ReconnectionInterval = 10000;

_server = server;
_port = port;
_topicPrefix = topicPrefix;
_documentFormat = documentFormat;
_clientId = clientId;
_qos = qos;
_configuration = configuration;
if (_configuration == null) _configuration = new MTConnectMqttExpanderConfiguration();

_inputClient = inputClient;
if (_inputClient != null)
{
_inputClient.DeviceReceived += DeviceReceived;
_inputClient.ObservationReceived += ObservationReceived;
_inputClient.AssetReceived += AssetReceived;
}

_mqttFactory = new MqttFactory();
_mqttClient = _mqttFactory.CreateMqttClient();
Expand Down Expand Up @@ -143,30 +131,30 @@ private async Task Worker()
try
{
// Declare new MQTT Client Options with Tcp Server
var clientOptionsBuilder = new MqttClientOptionsBuilder().WithTcpServer(_server, _port);
var clientOptionsBuilder = new MqttClientOptionsBuilder().WithTcpServer(_configuration.Server, _configuration.Port);

clientOptionsBuilder.WithCleanSession(false);

// Set Client ID
if (!string.IsNullOrEmpty(_clientId))
if (!string.IsNullOrEmpty(_configuration.ClientId))
{
clientOptionsBuilder.WithClientId(_clientId);
clientOptionsBuilder.WithClientId(_configuration.ClientId);
}

var certificates = new List<X509Certificate2>();

// Add CA (Certificate Authority)
if (!string.IsNullOrEmpty(_caCertPath))
if (!string.IsNullOrEmpty(_configuration.CertificateAuthority))
{
certificates.Add(new X509Certificate2(GetFilePath(_caCertPath)));
certificates.Add(new X509Certificate2(GetFilePath(_configuration.CertificateAuthority)));
}

// Add Client Certificate & Private Key
if (!string.IsNullOrEmpty(_pemClientCertPath) && !string.IsNullOrEmpty(_pemPrivateKeyPath))
if (!string.IsNullOrEmpty(_configuration.PemCertificate) && !string.IsNullOrEmpty(_configuration.PemPrivateKey))
{

#if NET5_0_OR_GREATER
certificates.Add(new X509Certificate2(X509Certificate2.CreateFromPemFile(GetFilePath(_pemClientCertPath), GetFilePath(_pemPrivateKeyPath)).Export(X509ContentType.Pfx)));
certificates.Add(new X509Certificate2(X509Certificate2.CreateFromPemFile(GetFilePath(_configuration.PemCertificate), GetFilePath(_configuration.PemPrivateKey)).Export(X509ContentType.Pfx)));
#else
throw new Exception("PEM Certificates Not Supported in .NET Framework 4.8 or older");
#endif
Expand All @@ -175,23 +163,23 @@ private async Task Worker()
{
UseTls = true,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
IgnoreCertificateRevocationErrors = _allowUntrustedCertificates,
IgnoreCertificateChainErrors = _allowUntrustedCertificates,
AllowUntrustedCertificates = _allowUntrustedCertificates,
IgnoreCertificateRevocationErrors = _configuration.AllowUntrustedCertificates,
IgnoreCertificateChainErrors = _configuration.AllowUntrustedCertificates,
AllowUntrustedCertificates = _configuration.AllowUntrustedCertificates,
Certificates = certificates
});
}

// Add Credentials
if (!string.IsNullOrEmpty(_username) && !string.IsNullOrEmpty(_password))
if (!string.IsNullOrEmpty(_configuration.Username) && !string.IsNullOrEmpty(_configuration.Password))
{
if (_useTls)
if (_configuration.UseTls)
{
clientOptionsBuilder.WithCredentials(_username, _password).WithTls();
clientOptionsBuilder.WithCredentials(_configuration.Username, _configuration.Password).WithTls();
}
else
{
clientOptionsBuilder.WithCredentials(_username, _password);
clientOptionsBuilder.WithCredentials(_configuration.Username, _configuration.Password);
}
}

Expand All @@ -201,17 +189,6 @@ private async Task Worker()
// Connect to the MQTT Client
_mqttClient.ConnectAsync(clientOptions).Wait();

//if (!string.IsNullOrEmpty(_deviceUuid))
//{
// // Start protocol for a single Device
// StartDeviceProtocol(_deviceUuid).Wait();
//}
//else
//{
// // Start protocol for all devices
// StartAllDevicesProtocol().Wait();
//}

ClientStarted?.Invoke(this, new EventArgs());

while (_mqttClient.IsConnected && !_stop.IsCancellationRequested)
Expand Down Expand Up @@ -247,13 +224,18 @@ private async Task Worker()
}


public async void DeviceReceived(object sender, IDevice device)
{
await PublishDevice(device);
}

public async Task PublishDevice(IDevice device)
{
if (_mqttClient != null && _mqttClient.IsConnected && device != null)
{
var topic = $"{_topicPrefix}/Devices/{device.Uuid}/Device";
var topic = $"{_configuration.TopicPrefix}/Devices/{device.Uuid}/Device";

var output = EntityFormatter.Format(_documentFormat, device);
var output = EntityFormatter.Format(_configuration.DocumentFormat, device);
if (output != null)
{
var message = new MqttApplicationMessage();
Expand All @@ -274,78 +256,70 @@ public async Task PublishDevice(IDevice device)
}
}

public async Task PublishCurrent(IEnumerable<IObservation> observations)
{
if (_mqttClient != null && _mqttClient.IsConnected && observations != null)
{
var deviceUuids = observations.Select(o => o.DeviceUuid).Distinct();
foreach (var deviceUuid in deviceUuids)
{
var dataItemIds = observations.Select(o => o.DataItemId).Distinct();
foreach (var dataItemId in dataItemIds)
{
var dataItemObservations = observations.Where(o => o.DataItemId == dataItemId);

var topic = $"{_topicPrefix}/Devices/{deviceUuid}/Observations/{dataItemId}/current";

var output = EntityFormatter.Format(_documentFormat, dataItemObservations);
if (output != null)
{
var message = new MqttApplicationMessage();
message.Topic = topic;
message.Payload = System.Text.Encoding.UTF8.GetBytes(output);
message.Retain = true;

var result = await _mqttClient.PublishAsync(message);
if (result.IsSuccess)
{

}
else
{

}
}
}
}
}
public async void ObservationReceived(object sender, IObservation observation)
{
await PublishObservation(observation);
}

public async Task PublishSample(IEnumerable<IObservation> observations)
public async Task PublishObservation(IObservation observation)
{
if (_mqttClient != null && _mqttClient.IsConnected && observations != null)
{
var deviceUuids = observations.Select(o => o.DeviceUuid).Distinct();
foreach (var deviceUuid in deviceUuids)
{
var dataItemIds = observations.Select(o => o.DataItemId).Distinct();
foreach (var dataItemId in dataItemIds)
{
var dataItemObservations = observations.Where(o => o.DataItemId == dataItemId);

var topic = $"{_topicPrefix}/Devices/{deviceUuid}/Observations/{dataItemId}/sample";
//if (_mqttClient != null && _mqttClient.IsConnected && observation != null)
//{
// var topic = $"{_configuration.TopicPrefix}/Devices/{device.Uuid}/Device";

// var output = EntityFormatter.Format(_configuration.DocumentFormat, device);
// if (output != null)
// {
// var message = new MqttApplicationMessage();
// message.Topic = topic;
// message.Payload = System.Text.Encoding.UTF8.GetBytes(output);
// message.Retain = true;

// var result = await _mqttClient.PublishAsync(message);
// if (result.IsSuccess)
// {

// }
// else
// {

// }
// }
//}
}

var output = EntityFormatter.Format(_documentFormat, dataItemObservations);
if (output != null)
{
var message = new MqttApplicationMessage();
message.Topic = topic;
message.Payload = System.Text.Encoding.UTF8.GetBytes(output);
message.Retain = true;

var result = await _mqttClient.PublishAsync(message);
if (result.IsSuccess)
{
public async void AssetReceived(object sender, IAsset asset)
{

}
else
{
}

}
}
}
}
}
public async Task PublishAsset(IAsset asset)
{
//if (_mqttClient != null && _mqttClient.IsConnected && asset != null)
//{
// var topic = $"{_configuration.TopicPrefix}/Devices/{device.Uuid}/Device";

// var output = EntityFormatter.Format(_configuration.DocumentFormat, device);
// if (output != null)
// {
// var message = new MqttApplicationMessage();
// message.Topic = topic;
// message.Payload = System.Text.Encoding.UTF8.GetBytes(output);
// message.Retain = true;

// var result = await _mqttClient.PublishAsync(message);
// if (result.IsSuccess)
// {

// }
// else
// {

// }
// }
//}
}


Expand Down

0 comments on commit 84abbb5

Please sign in to comment.