Skip to content

Commit

Permalink
Updated MQTT Client and Server
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrickRitchie committed Dec 9, 2023
1 parent de75ac0 commit 2b71147
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 145 deletions.
4 changes: 2 additions & 2 deletions agent/Modules/MTConnect.NET-AgentModule-MqttBroker/Module.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class Module : MTConnectAgentModule
private const string ModuleId = "MQTT Broker";

private readonly ModuleConfiguration _configuration;
private readonly MTConnectMqttServer _server;
private readonly MTConnectMqttDocumentServer _server;
private MqttServer _mqttServer;
private CancellationTokenSource _stop;

Expand All @@ -34,7 +34,7 @@ public Module(IMTConnectAgentBroker mtconnectAgent, object configuration) : base

_configuration = AgentApplicationConfiguration.GetConfiguration<ModuleConfiguration>(configuration);

_server = new MTConnectMqttServer(mtconnectAgent, _configuration);
_server = new MTConnectMqttDocumentServer(mtconnectAgent, _configuration);
_server.ProbeReceived += ProbeReceived;
_server.CurrentReceived += CurrentReceived;
_server.SampleReceived += SampleReceived;
Expand Down
27 changes: 18 additions & 9 deletions agent/Modules/MTConnect.NET-AgentModule-MqttRelay/Module.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class Module : MTConnectAgentModule
private const string ModuleId = "MQTT Relay";

private readonly ModuleConfiguration _configuration;
private readonly MTConnectMqttServer _server;
private readonly MTConnectMqttDocumentServer _server;
private readonly MqttFactory _mqttFactory;
private readonly IMqttClient _mqttClient;
private CancellationTokenSource _stop;
Expand All @@ -37,7 +37,7 @@ public Module(IMTConnectAgentBroker mtconnectAgent, object configuration) : base

_configuration = AgentApplicationConfiguration.GetConfiguration<ModuleConfiguration>(configuration);

_server = new MTConnectMqttServer(mtconnectAgent, _configuration);
_server = new MTConnectMqttDocumentServer(mtconnectAgent, _configuration);
_server.ProbeReceived += ProbeReceived;
_server.CurrentReceived += CurrentReceived;
_server.SampleReceived += SampleReceived;
Expand Down Expand Up @@ -166,15 +166,18 @@ private async void ProbeReceived(IDevice device, IDevicesResponseDocument respon
{
if (_mqttClient != null && _mqttClient.IsConnected)
{
var formatResult = ResponseDocumentFormatter.Format(_configuration.DocumentFormat, responseDocument);
var x = new List<KeyValuePair<string, string>>();
x.Add(new KeyValuePair<string, string>("indentOutput", _configuration.IndentOutput.ToString()));

var formatResult = ResponseDocumentFormatter.Format(_configuration.DocumentFormat, responseDocument, x);
if (formatResult.Success)
{
var topic = $"{_configuration.TopicPrefix}/{_configuration.ProbeTopic}/{device.Uuid}";

var message = new MqttApplicationMessage();
message.Retain = true;
message.Topic = topic;
message.QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce;
message.QualityOfServiceLevel = (MQTTnet.Protocol.MqttQualityOfServiceLevel)_configuration.QoS;
message.Payload = formatResult.Content;

try
Expand All @@ -201,15 +204,18 @@ private async void CurrentReceived(IDevice device, IStreamsResponseOutputDocumen
{
if (_mqttClient != null && _mqttClient.IsConnected)
{
var formatResult = ResponseDocumentFormatter.Format(_configuration.DocumentFormat, ref responseDocument);
var x = new List<KeyValuePair<string, string>>();
x.Add(new KeyValuePair<string, string>("indentOutput", _configuration.IndentOutput.ToString()));

var formatResult = ResponseDocumentFormatter.Format(_configuration.DocumentFormat, ref responseDocument, x);
if (formatResult.Success)
{
var topic = $"{_configuration.TopicPrefix}/{_configuration.CurrentTopic}/{device.Uuid}";

var message = new MqttApplicationMessage();
//message.Retain = true;
message.Topic = topic;
message.QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce;
message.QualityOfServiceLevel = (MQTTnet.Protocol.MqttQualityOfServiceLevel)_configuration.QoS;
message.Payload = formatResult.Content;

try
Expand All @@ -236,15 +242,18 @@ private async void SampleReceived(IDevice device, IStreamsResponseOutputDocument
{
if (_mqttClient != null && _mqttClient.IsConnected)
{
var formatResult = ResponseDocumentFormatter.Format(_configuration.DocumentFormat, ref responseDocument);
var x = new List<KeyValuePair<string, string>>();
x.Add(new KeyValuePair<string, string>("indentOutput", _configuration.IndentOutput.ToString()));

var formatResult = ResponseDocumentFormatter.Format(_configuration.DocumentFormat, ref responseDocument, x);
if (formatResult.Success)
{
var topic = $"{_configuration.TopicPrefix}/{_configuration.SampleTopic}/{device.Uuid}";

var message = new MqttApplicationMessage();
//message.Retain = true;
message.Topic = topic;
message.QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce;
message.QualityOfServiceLevel = (MQTTnet.Protocol.MqttQualityOfServiceLevel)_configuration.QoS;
message.Payload = formatResult.Content;

try
Expand Down Expand Up @@ -281,7 +290,7 @@ private async void AssetReceived(IDevice device, IAssetsResponseDocument respons
var message = new MqttApplicationMessage();
message.Retain = true;
message.Topic = topic;
message.QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce;
message.QualityOfServiceLevel = (MQTTnet.Protocol.MqttQualityOfServiceLevel)_configuration.QoS;
message.Payload = formatResult.Content;

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class ModuleConfiguration : IMTConnectMqttServerConfiguration

public string DocumentFormat { get; set; }

public bool IndentOutput { get; set; }


public string TopicPrefix { get; set; }

Expand Down
Loading

0 comments on commit 2b71147

Please sign in to comment.