-
-
Notifications
You must be signed in to change notification settings - Fork 41
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
1ddf8dc
commit 9ad194a
Showing
5 changed files
with
266 additions
and
176 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
107 changes: 0 additions & 107 deletions
107
agent/Modules/MTConnect.NET-MqttBroker-Module/MTConnectMqttBrokerModule.cs
This file was deleted.
Oops, something went wrong.
197 changes: 197 additions & 0 deletions
197
agent/Modules/MTConnect.NET-MqttBroker-Module/Module.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,197 @@ | ||
// Copyright (c) 2023 TrakHound Inc., All Rights Reserved. | ||
// TrakHound Inc. licenses this file to you under the MIT license. | ||
|
||
using MQTTnet; | ||
using MQTTnet.Server; | ||
using MTConnect.Agents; | ||
using MTConnect.Assets; | ||
using MTConnect.Configurations; | ||
using MTConnect.Devices; | ||
using MTConnect.Formatters; | ||
using MTConnect.Streams.Output; | ||
using System; | ||
using System.IO; | ||
using System.Security.Cryptography.X509Certificates; | ||
using System.Threading.Tasks; | ||
|
||
namespace MTConnect.Modules | ||
{ | ||
public class Module : MTConnectAgentModule | ||
{ | ||
public const string ConfigurationTypeId = "mqtt-broker"; | ||
|
||
//private readonly Logger _httpLogger = LogManager.GetLogger("http-logger"); | ||
//private readonly Logger _agentValidationLogger = LogManager.GetLogger("agent-validation-logger"); | ||
private readonly ModuleConfiguration _configuration; | ||
private readonly MTConnectMqttServer _server; | ||
//private IMTConnectAgentBroker _mtconnectAgent; | ||
private MqttServer _mqttServer; | ||
|
||
|
||
public Module(IMTConnectAgentBroker mtconnectAgent, object configuration) : base(mtconnectAgent) | ||
{ | ||
_configuration = AgentApplicationConfiguration.GetConfiguration<ModuleConfiguration>(configuration); | ||
|
||
_server = new MTConnectMqttServer(mtconnectAgent, _configuration); | ||
_server.ProbeReceived += ProbeReceived; | ||
_server.CurrentReceived += CurrentReceived; | ||
_server.SampleReceived += SampleReceived; | ||
_server.AssetReceived += AssetReceived; | ||
} | ||
|
||
|
||
protected override void OnStartBeforeLoad() | ||
{ | ||
StartAsync().Wait(); | ||
//_ = Task.Run(StartAsync); | ||
} | ||
|
||
protected override void OnStop() | ||
{ | ||
if (_mqttServer != null) _mqttServer.StopAsync(); | ||
} | ||
|
||
|
||
private async Task StartAsync() | ||
{ | ||
var mqttServerOptionsBuilder = new MqttServerOptionsBuilder().WithDefaultEndpoint(); | ||
|
||
// Add Certificate & Private Key | ||
if (!string.IsNullOrEmpty(_configuration.PemCertificate) && !string.IsNullOrEmpty(_configuration.PemPrivateKey)) | ||
{ | ||
X509Certificate2 certificate = null; | ||
|
||
#if NET5_0_OR_GREATER | ||
certificate = new X509Certificate2(X509Certificate2.CreateFromPemFile(GetFilePath(_configuration.PemCertificate), GetFilePath(_configuration.PemPrivateKey)).Export(X509ContentType.Pfx)); | ||
#endif | ||
|
||
if (certificate != null) | ||
{ | ||
mqttServerOptionsBuilder.WithoutDefaultEndpoint(); | ||
mqttServerOptionsBuilder.WithEncryptedEndpoint(); | ||
mqttServerOptionsBuilder.WithEncryptedEndpointPort(_configuration.Port); | ||
mqttServerOptionsBuilder.WithEncryptionCertificate(certificate); | ||
mqttServerOptionsBuilder.WithEncryptionSslProtocol(System.Security.Authentication.SslProtocols.Tls12); | ||
} | ||
} | ||
|
||
var mqttServerOptions = mqttServerOptionsBuilder.Build(); | ||
|
||
var mqttFactory = new MqttFactory(); | ||
_mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions); | ||
|
||
_mqttServer.ClientConnectedAsync += async (args) => | ||
{ | ||
//if (ClientConnected != null) ClientConnected.Invoke(this, new EventArgs()); | ||
}; | ||
_mqttServer.ClientDisconnectedAsync += async (args) => | ||
{ | ||
//if (ClientDisconnected != null) ClientDisconnected.Invoke(this, new EventArgs()); | ||
}; | ||
|
||
await _mqttServer.StartAsync(); | ||
_server.Start(); | ||
} | ||
|
||
|
||
private async void ProbeReceived(IDevice device, IDevicesResponseDocument responseDocument) | ||
{ | ||
if (_mqttServer != null && _mqttServer.IsStarted) | ||
{ | ||
var formatResult = ResponseDocumentFormatter.Format(_configuration.DocumentFormat, responseDocument); | ||
if (formatResult.Success) | ||
{ | ||
var topic = $"{_configuration.TopicPrefix}/{device.Uuid}/{_configuration.ProbeTopic}"; | ||
|
||
var message = new MqttApplicationMessage(); | ||
message.Retain = true; | ||
message.Topic = topic; | ||
message.QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce; | ||
message.Payload = formatResult.Content; | ||
|
||
var injectMessage = new InjectedMqttApplicationMessage(message); | ||
|
||
await _mqttServer.InjectApplicationMessage(injectMessage); | ||
} | ||
} | ||
} | ||
|
||
private async void CurrentReceived(IDevice device, IStreamsResponseOutputDocument responseDocument) | ||
{ | ||
if (_mqttServer != null && _mqttServer.IsStarted) | ||
{ | ||
var formatResult = ResponseDocumentFormatter.Format(_configuration.DocumentFormat, ref responseDocument); | ||
if (formatResult.Success) | ||
{ | ||
var topic = $"{_configuration.TopicPrefix}/{device.Uuid}/{_configuration.CurrentTopic}"; | ||
|
||
var message = new MqttApplicationMessage(); | ||
message.Retain = true; | ||
message.Topic = topic; | ||
message.QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce; | ||
message.Payload = formatResult.Content; | ||
|
||
var injectMessage = new InjectedMqttApplicationMessage(message); | ||
|
||
await _mqttServer.InjectApplicationMessage(injectMessage); | ||
} | ||
} | ||
} | ||
|
||
private async void SampleReceived(IDevice device, IStreamsResponseOutputDocument responseDocument) | ||
{ | ||
if (_mqttServer != null && _mqttServer.IsStarted) | ||
{ | ||
var formatResult = ResponseDocumentFormatter.Format(_configuration.DocumentFormat, ref responseDocument); | ||
if (formatResult.Success) | ||
{ | ||
var topic = $"{_configuration.TopicPrefix}/{device.Uuid}/{_configuration.SampleTopic}"; | ||
|
||
var message = new MqttApplicationMessage(); | ||
//message.Retain = true; | ||
message.Topic = topic; | ||
message.QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce; | ||
message.Payload = formatResult.Content; | ||
|
||
var injectMessage = new InjectedMqttApplicationMessage(message); | ||
|
||
await _mqttServer.InjectApplicationMessage(injectMessage); | ||
} | ||
} | ||
} | ||
|
||
private async void AssetReceived(IDevice device, IAssetsResponseDocument responseDocument) | ||
{ | ||
if (_mqttServer != null && _mqttServer.IsStarted) | ||
{ | ||
var formatResult = ResponseDocumentFormatter.Format(_configuration.DocumentFormat, responseDocument); | ||
if (formatResult.Success) | ||
{ | ||
var topic = $"{_configuration.TopicPrefix}/{device.Uuid}/{_configuration.AssetTopic}"; | ||
|
||
var message = new MqttApplicationMessage(); | ||
message.Retain = true; | ||
message.Topic = topic; | ||
message.QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce; | ||
message.Payload = formatResult.Content; | ||
|
||
var injectMessage = new InjectedMqttApplicationMessage(message); | ||
|
||
await _mqttServer.InjectApplicationMessage(injectMessage); | ||
} | ||
} | ||
} | ||
|
||
|
||
private static string GetFilePath(string path) | ||
{ | ||
var x = path; | ||
if (!Path.IsPathRooted(x)) | ||
{ | ||
x = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, x); | ||
} | ||
|
||
return x; | ||
} | ||
} | ||
} |
Oops, something went wrong.