Skip to content

Commit

Permalink
feature: upgrade to aedes and new in-flight manager (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
KEGustafsson authored Feb 23, 2024
1 parent 85acd09 commit 9ac2a76
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 70 deletions.
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
# Signal K - MQTT Gateway

Signal K Node server plugin that functions as a gateway between MQTT and SK server. You can
- run a local server that has all SK data available and that routes all data from other MQTT clients to Signal K deltas. The server is advertised via mdns/Bonjour
- connect to a MQTT server and send deltas you choose with chosen interval to `signalk/delta`
Signal K Node server plugin that functions as a gateway between MQTT and SK server.

![image](https://user-images.githubusercontent.com/1049678/28848552-0d624088-771c-11e7-963d-4a7761bfd2a4.png)
## Local MQTT broker/server
- All SK deltas data available from broker/server. The server is advertised via mdns/Bonjour if available
## MQTT Client
- Send user selectable deltas (vessels.self, all deltas, JSON deltas from selectable paths or alldetas and JSON deltas) to remote broker/server

![image](https://github.com/KEGustafsson/signalk-mqtt-gw/assets/3332251/9e37d8f6-b043-4118-a1c7-0c581d01ffd3)

![image](https://github.com/KEGustafsson/signalk-mqtt-gw/assets/3332251/445fdd5e-9277-4bab-ada0-b58bd02242ed)

![image](https://github.com/KEGustafsson/signalk-mqtt-gw/assets/3332251/20b3ad30-1e48-4b4f-962f-5f64f70bd7e8)


If you run a local server you can send data to the server like so:
Expand Down
234 changes: 172 additions & 62 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
* Copyright 2016 Teppo Kurki <[email protected]>
*
* Copyright 2024 Karl-Erik Gustafsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -15,22 +16,106 @@
*/

const id = 'signalk-mqtt-gw';
const debug = require('debug')(id);
const mosca = require('mosca');
const mqtt = require('mqtt');
const NeDBStore = require('mqtt-nedb-store');
const { Manager } = require("mqtt-jsonl-store");

module.exports = function(app) {
var plugin = {
module.exports = function createPlugin(app) {
let plugin = {
unsubscribes: [],
};
var server

plugin.id = id;
plugin.name = 'Signal K - MQTT Gateway';
plugin.description =
'plugin that provides gateway functionality between Signal K and MQTT';
plugin.description = 'Plugin that provides gateway functionality between Signal K and MQTT';

let server;
let aedes;
let ad;
let client;
let manager;
const setStatus = app.setPluginStatus || app.setProviderStatus;
let localServerMessage = 'not running';
let remoteServerMessage = 'not running';

plugin.start = function (options) {
app.debug("Aedes MQTT Plugin Started");
statusUpdate();
plugin.onStop = [];

if (options.runLocalServer) {
startLocalServer(options, plugin.onStop);
localServerMessage = 'running on port ' + options.port;
statusUpdate();
}
if (options.sendToRemote) {
manager = new Manager(app.getDataDirPath());
startMqttClient(manager,plugin.onStop);
}
async function startMqttClient(manager) {
await manager.open();
client = mqtt.connect(options.remoteHost, {
rejectUnauthorized: options.rejectUnauthorized,
reconnectPeriod: 60000,
clientId: app.selfId,
incomingStore: manager.incoming,
outgoingStore: manager.outgoing,
username: options.username,
password: options.password
});
client.on('error', (err) => console.error(err))

let deltaHandler = undefined;
if (options.selectedOption === '1) vessels.self') {
deltaHandler = (delta) => publishRemoteDelta(delta, client, false)
remoteServerMessage = 'vessels.self to ' + options.remoteHost;
}
else if (options.selectedOption === '2) all deltas') {
deltaHandler = (delta) => publishRemoteDelta(delta, client, true)
remoteServerMessage = 'all deltas to ' + options.remoteHost;
}
else if (options.selectedOption === '3) self paths in JSON format') {
startSending(options, client, plugin.onStop);
remoteServerMessage = 'JSON to ' + options.remoteHost;
}
else if (options.selectedOption === '4) all deltas + JSON') {
startSending(options, client, plugin.onStop);
deltaHandler = (delta) => publishRemoteDelta(delta, client, true);
remoteServerMessage = 'all deltas and JSON to ' + options.remoteHost;
}

if (deltaHandler) {
app.signalk.on('delta', deltaHandler);
}

statusUpdate();

plugin.onStop.push(_ => {
client.end()
stopManager()
deltaHandler && app.signalk.removeListener('delta', deltaHandler);
});
}
};

async function stopManager() {
try {
await manager.close();
app.debug('manager closed')
} catch (error) {}
}

plugin.stop = function stop() {
plugin.onStop.forEach(f => f());
plugin.onStop = [];
if (server) {
server.close();
aedes.close();
if (ad) {
ad.stop();
}
}
app.debug("Aedes MQTT Plugin Stopped");
};

plugin.schema = {
title: 'Signal K - MQTT Gateway',
type: 'object',
Expand Down Expand Up @@ -71,9 +156,16 @@ module.exports = function(app) {
default: false,
title: "Reject self signed and invalid server certificates"
},
selectedOption: {
type: "string",
title: "Data to send to remote server",
enum: ["1) vessels.self", "2) all deltas", "3) self paths in JSON format", "4) all deltas + JSON"],
description: 'Select the type of data to send to the remote server',
default: "1) vessels.self"
},
paths: {
type: 'array',
title: 'Signal K self paths to send',
title: 'Signal K self paths to send (JSON format), selection 3) or 4) above',
default: [{ path: 'navigation.position', interval: 60 }],
items: {
type: 'object',
Expand All @@ -93,47 +185,24 @@ module.exports = function(app) {
},
};

var started = false;
var ad;

plugin.onStop = [];

plugin.start = function(options) {
plugin.onStop = [];

if (options.runLocalServer) {
startLocalServer(options, plugin.onStop);
}
if (options.sendToRemote) {
const manager = NeDBStore(app.getDataDirPath());
const client = mqtt.connect(options.remoteHost, {
rejectUnauthorized: options.rejectUnauthorized,
reconnectPeriod: 60000,
clientId: app.selfId,
outgoingStore: manager.outgoing,
username: options.username,
password: options.password
});
client.on('error', (err) => console.error(err))
startSending(options, client, plugin.onStop);
plugin.onStop.push(_ => client.end());
}
started = true;
};

plugin.stop = function() {
plugin.onStop.forEach(f => f());
};
function outputMessages() {
setImmediate(() =>
app.reportOutputMessages()
)
}

return plugin;
function statusUpdate () {
setStatus(`Broker: ${localServerMessage}, Client: ${remoteServerMessage}`)
}

function startSending(options, client, onStop) {
options.paths.forEach(pathInterval => {
onStop.push(
app.streambundle
.getSelfBus(pathInterval.path)
.debounceImmediate(pathInterval.interval * 1000)
.onValue(normalizedPathValue =>
.onValue(normalizedPathValue => {
outputMessages();
client.publish(
'signalk/delta',
JSON.stringify({
Expand All @@ -153,22 +222,57 @@ module.exports = function(app) {
}),
{ qos: 1 }
)
)
})
);
});
}

function publishRemoteDelta(delta, client, allDelta) {
if (allDelta) {
publishDelta(delta, client);
} else {
if (delta.context === app.selfContext) {
publishDelta(delta, client);
}
}
}

function publishDelta(delta, client) {
const prefix =
(delta.context === app.selfContext
? 'vessels/self'
: delta.context.replace('.', '/')) + '/';
(delta.updates || []).forEach(update => {
(update.values || []).forEach(pathValue => {
client.publish(
prefix + pathValue.path.replace(/\./g, '/'),
pathValue.value === null ? 'null' : toText(pathValue.value),
{ qos: 1 }
)
});
});
outputMessages();
}

function startLocalServer(options, onStop) {
server = new mosca.Server(options);
aedes = require('aedes')();
server = require('net').createServer(aedes.handle)
const port = options.port || 1883;

server.listen(port, function() {
app.debug('Aedes MQTT server is up and running on port', port)
onReady()
})

app.signalk.on('delta', publishLocalDelta);
onStop.push(_ => { app.signalk.removeListener('delta', publishLocalDelta) });

server.on('clientConnected', function(client) {
console.log('client connected', client.id);
aedes.on('client', function(client) {
app.debug('client connected', client.id);
});

server.on('published', function(packet, client) {
aedes.on('publish', async function(packet, client) {
app.debug('Published', packet.topic, packet.payload.toString());
if (client) {
var skData = extractSkData(packet);
if (skData.valid) {
Expand All @@ -177,23 +281,25 @@ module.exports = function(app) {
}
});

server.on('ready', onReady);
// server.on('error', (err) => {
// app.error(err)
// })

function onReady() {
try {
const mdns = require('mdns');
ad = mdns.createAdvertisement(mdns.tcp('mqtt'), options.port);
ad.start();
app.debug(
'MQTT server is advertised on mDNS as mqtt.tcp://<hostname>:' + options.port
);
} catch (e) {
console.error(e.message);
}
console.log(
'Mosca MQTT server is up and running on port ' + options.port
);
onStop.push(_ => { server.close() });

onStop.push(_ => {
server.close()
aedes.close()
if (ad) {
ad.stop();
}
});
}
}

Expand All @@ -204,7 +310,7 @@ module.exports = function(app) {
: delta.context.replace('.', '/')) + '/';
(delta.updates || []).forEach(update => {
(update.values || []).forEach(pathValue => {
server.publish({
aedes.publish({
topic: prefix + pathValue.path.replace(/\./g, '/'),
payload:
pathValue.value === null ? 'null' : toText(pathValue.value),
Expand All @@ -216,10 +322,12 @@ module.exports = function(app) {
}

function toText(value) {
if (typeof value === 'object') {
return JSON.stringify(value)
if (typeof value !== 'undefined') {
if (typeof value === 'object') {
return JSON.stringify(value)
}
return value.toString()
}
return value.toString()
}

function extractSkData(packet) {
Expand Down Expand Up @@ -259,4 +367,6 @@ module.exports = function(app) {
],
};
}

return plugin;
};
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
"author": "teppo.kurki@iki,fi",
"license": "ISC",
"dependencies": {
"aedes": "^0.51.0",
"debug": "^2.3.2",
"mosca": "^2.2.0",
"mqtt-nedb-store": "^0.1.0",
"jsonschema": "1.2.6"
"mqtt": "^5.3.5",
"mqtt-jsonl-store": "^0.2.1"
},
"optionalDependencies": {
"mdns": "^2.3.3"
"mdns": "^2.7.2"
}
}

0 comments on commit 9ac2a76

Please sign in to comment.