Skip to content

Commit

Permalink
Updated MQTT Adapter Agent Module
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrickRitchie committed Dec 12, 2023
1 parent fd9291a commit 4484b1f
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 80 deletions.
194 changes: 129 additions & 65 deletions agent/Modules/MTConnect.NET-AgentModule-MqttAdapter/Module.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
using MQTTnet.Protocol;
using MQTTnet.Server;
using MTConnect.Agents;
using MTConnect.Assets;
using MTConnect.Configurations;
using MTConnect.Devices;
using MTConnect.Formatters;
using MTConnect.Input;
using MTConnect.Logging;
Expand All @@ -23,6 +25,9 @@ public class Module : MTConnectAgentModule
{
public const string ConfigurationTypeId = "mqtt-adapter";
private const string ModuleId = "MQTT Adapter";
private const string ObservationTopic = "observations";
private const string AssetTopic = "assets";
private const string DeviceTopic = "device";

private readonly ModuleConfiguration _configuration;
private readonly IMTConnectAgentBroker _mtconnectAgent;
Expand Down Expand Up @@ -79,6 +84,9 @@ private async Task Worker()
// Publish Only so use Clean Session = true
clientOptionsBuilder.WithCleanSession(_configuration.CleanSession);

// Sets the Timeout
clientOptionsBuilder.WithTimeout(TimeSpan.FromMilliseconds(_configuration.Timeout));

// Set Client ID
if (!string.IsNullOrEmpty(_configuration.ClientId))
{
Expand Down Expand Up @@ -139,7 +147,7 @@ private async Task Worker()

Log(MTConnectLogLevel.Information, $"MQTT Adapter Connected to External Broker ({_configuration.Server}:{_configuration.Port})");

if (!string.IsNullOrEmpty(_configuration.Topic))
if (!string.IsNullOrEmpty(_configuration.TopicPrefix))
{
// Set QoS
MqttQualityOfServiceLevel qos;
Expand All @@ -151,9 +159,9 @@ private async Task Worker()
}

// Subscribe to Topic
await _mqttClient.SubscribeAsync($"{_configuration.Topic}/#", qos);
await _mqttClient.SubscribeAsync($"{_configuration.TopicPrefix}/#", qos);

Log(MTConnectLogLevel.Information, $"MQTT Adapter Subscribed to ({_configuration.Topic} @ QoS = {qos})");
Log(MTConnectLogLevel.Information, $"MQTT Adapter Subscribed to ({_configuration.TopicPrefix} @ QoS = {qos})");
}
else
{
Expand All @@ -172,7 +180,7 @@ private async Task Worker()

Log(MTConnectLogLevel.Information, $"MQTT Adapter Disconnected from External Broker ({_configuration.Server}:{_configuration.Port})");

await Task.Delay(_configuration.RetryInterval, _stop.Token);
await Task.Delay(_configuration.ReconnectInterval, _stop.Token);
}
catch (TaskCanceledException) { }
catch (Exception) { }
Expand All @@ -194,32 +202,86 @@ private Task MessageReceived(MqttApplicationMessageReceivedEventArgs args)
{
var topic = args.ApplicationMessage.Topic;

var observations = ProcessPayload(args.ApplicationMessage.Payload);
if (!observations.IsNullOrEmpty())
if (IsObservationTopic(topic))
{
_mtconnectAgent.AddObservations(_configuration.DeviceKey, observations);
var observations = ProcessObservationPayload(args.ApplicationMessage.Payload);
if (!observations.IsNullOrEmpty())
{
_mtconnectAgent.AddObservations(_configuration.DeviceKey, observations);
}
}
else if (IsAssetTopic(topic))
{
var assets = ProcessAssetPayload(args.ApplicationMessage.Payload);
if (!assets.IsNullOrEmpty())
{
_mtconnectAgent.AddAssets(_configuration.DeviceKey, assets);
}
}
else if (IsDeviceTopic(topic))
{
var device = ProcessDevicePayload(args.ApplicationMessage.Payload);
if (device != null && !string.IsNullOrEmpty(device.Uuid) && !string.IsNullOrEmpty(device.Name))
{
if (!string.IsNullOrEmpty(_configuration.DeviceKey))
{
if (_configuration.DeviceKey.ToLower() == device.Uuid.ToLower() ||
_configuration.DeviceKey.ToLower() == device.Name.ToLower())
{
_mtconnectAgent.AddDevice(device);
}
}
else
{
_mtconnectAgent.AddDevice(device);
}
}
}
}

return Task.CompletedTask;
}

private IEnumerable<IObservationInput> ProcessPayload(byte[] payload)
private bool IsObservationTopic(string topic)
{
if (topic != null)
{
var prefix = $"{_configuration.TopicPrefix}/{ObservationTopic}";
return topic.StartsWith(prefix);
}

return false;
}

private bool IsAssetTopic(string topic)
{
if (topic != null)
{
var prefix = $"{_configuration.TopicPrefix}/{AssetTopic}";
return topic.StartsWith(prefix);
}

return false;
}

private bool IsDeviceTopic(string topic)
{
if (topic != null)
{
var prefix = $"{_configuration.TopicPrefix}/{DeviceTopic}";
return topic.StartsWith(prefix);
}

return false;
}


private IEnumerable<IObservationInput> ProcessObservationPayload(byte[] payload)
{
if (!payload.IsNullOrEmpty())
{
try
{
//// Decompress from gzip
//byte[] uncompressedBytes;
//using (var inputStream = new MemoryStream(payload))
//using (var outputStream = new MemoryStream())
//using (var encodingStream = new GZipStream(inputStream, CompressionMode.Decompress, true))
//{
// encodingStream.CopyTo(outputStream);
// uncompressedBytes = outputStream.ToArray();
//}

var uncompressedBytes = payload;
if (!uncompressedBytes.IsNullOrEmpty())
{
Expand All @@ -228,18 +290,6 @@ private IEnumerable<IObservationInput> ProcessPayload(byte[] payload)
{
return readResult.Content;
}

//// Convert JSON bytes to Mqtt PayloadModel
//IEnumerable<MTConnectMqttInputObservations> payloadModel = null;
//using (var inputStream2 = new MemoryStream(uncompressedBytes))
//{
// payloadModel = JsonSerializer.Deserialize<IEnumerable<MTConnectMqttInputObservations>>(inputStream2);
//}

//if (payloadModel != null)
//{
// return payloadModel;
//}
}
}
catch (Exception ex)
Expand All @@ -251,41 +301,55 @@ private IEnumerable<IObservationInput> ProcessPayload(byte[] payload)
return null;
}

//public static byte[] CreatePayload(TrakHoundEntityCollection collection)
//{
// if (collection != null)
// {
// try
// {
// // Convert to Mqtt Collection (serializable to Protobuf)
// var mqttCollection = new TrakHoundMqttEntityCollection(collection);

// // Convert to Protobuf
// byte[] protobufBytes;
// using (var inputStream1 = new MemoryStream())
// {
// Serializer.Serialize(inputStream1, mqttCollection);
// protobufBytes = inputStream1.ToArray();
// }

// // Compress to gzip
// byte[] compressedBytes;
// using (var inputStream2 = new MemoryStream())
// {
// using (var zip = new GZipStream(inputStream2, CompressionMode.Compress, true))
// {
// zip.Write(protobufBytes, 0, protobufBytes.Length);
// }
// compressedBytes = inputStream2.ToArray();
// }

// return compressedBytes;
// }
// catch { }
// }

// return null;
//}
private IEnumerable<IAsset> ProcessAssetPayload(byte[] payload)
{
if (!payload.IsNullOrEmpty())
{
try
{
var uncompressedBytes = payload;
if (!uncompressedBytes.IsNullOrEmpty())
{
var readResult = InputFormatter.CreateAssets(_configuration.DocumentFormat, uncompressedBytes);
if (readResult.Success)
{
return readResult.Content;
}
}
}
catch (Exception ex)
{

}
}

return null;
}

private IDevice ProcessDevicePayload(byte[] payload)
{
if (!payload.IsNullOrEmpty())
{
try
{
var uncompressedBytes = payload;
if (!uncompressedBytes.IsNullOrEmpty())
{
var readResult = InputFormatter.CreateDevice(_configuration.DocumentFormat, uncompressedBytes);
if (readResult.Success)
{
return readResult.Content;
}
}
}
catch (Exception ex)
{

}
}

return null;
}


private static string GetFilePath(string path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,92 @@ namespace MTConnect.Configurations
{
public class ModuleConfiguration
{
/// <summary>
/// The MQTT broker hostname
/// </summary>
public string Server { get; set; }

/// <summary>
/// The MQTT broker port number
/// </summary>
public int Port { get; set; }

public int Interval { get; set; }
/// <summary>
/// The timeout (in milliseconds) to use for connection and read/write
/// </summary>
public int Timeout { get; set; }

/// <summary>
/// The interval (in milliseconds) to delay between disconnections
/// </summary>
public int ReconnectInterval { get; set; }


/// <summary>
/// Sets the Username to use for authentication
/// </summary>
public string Username { get; set; }

/// <summary>
/// Sets the Password to use for authentication
/// </summary>
public string Password { get; set; }

/// <summary>
/// Sets the Client ID to use for the connection
/// </summary>
public string ClientId { get; set; }

/// <summary>
/// Sets the CleanSession flag (true or false)
/// </summary>
public bool CleanSession { get; set; }

/// <summary>
/// Sets the Quality Of Service (QoS) to use. 0 = At Most Once, 1 = At least Once, 2 = Exactly Once
/// </summary>
public int QoS { get; set; }


/// <summary>
/// The path to the Certificate Authority file
/// </summary>
public string CertificateAuthority { get; set; }

/// <summary>
/// The path to the PEM Certificate (.pem) file
/// </summary>
public string PemCertificate { get; set; }

/// <summary>
/// The path to the PEM Private Key file
/// </summary>
public string PemPrivateKey { get; set; }

/// <summary>
/// Sets whether to validate the certificate chain (true or false)
/// </summary>
public bool AllowUntrustedCertificates { get; set; }

/// <summary>
/// Sets whether to use TLS or not (true or false)
/// </summary>
public bool UseTls { get; set; }


public int RetryInterval { get; set; }

public string Topic { get; set; }
/// <summary>
/// The MQTT topic prefix to subscribe to
/// </summary>
public string TopicPrefix { get; set; }

/// <summary>
/// The UUID or Name of the Device to read data for
/// </summary>
public string DeviceKey { get; set; }

/// <summary>
/// The Document Format ID to use to format the input data
/// </summary>
public string DocumentFormat { get; set; }


Expand All @@ -48,7 +100,8 @@ public ModuleConfiguration()
Port = 1883;
QoS = 1;
CleanSession = true;
RetryInterval = 5000;
Timeout = 5000;
ReconnectInterval = 10000;
DocumentFormat = "json";
}
}
Expand Down
Loading

0 comments on commit 4484b1f

Please sign in to comment.