From 9ac2a76dceacf020d69507b2571b5bbd419c430f Mon Sep 17 00:00:00 2001 From: Karl-Erik Gustafsson Date: Fri, 23 Feb 2024 17:11:23 +0200 Subject: [PATCH] feature: upgrade to aedes and new in-flight manager (#40) --- README.md | 15 +++- index.js | 234 +++++++++++++++++++++++++++++++++++++-------------- package.json | 8 +- 3 files changed, 187 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index afa0171..bead8df 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,17 @@ # Signal K - MQTT Gateway -Signal K Node server plugin that functions as a gateway between MQTT and SK server. You can -- run a local server that has all SK data available and that routes all data from other MQTT clients to Signal K deltas. The server is advertised via mdns/Bonjour -- connect to a MQTT server and send deltas you choose with chosen interval to `signalk/delta` +Signal K Node server plugin that functions as a gateway between MQTT and SK server. -![image](https://user-images.githubusercontent.com/1049678/28848552-0d624088-771c-11e7-963d-4a7761bfd2a4.png) +## Local MQTT broker/server +- All SK deltas data available from broker/server. The server is advertised via mdns/Bonjour if available +## MQTT Client +- Send user selectable deltas (vessels.self, all deltas, JSON deltas from selectable paths or alldetas and JSON deltas) to remote broker/server + +![image](https://github.com/KEGustafsson/signalk-mqtt-gw/assets/3332251/9e37d8f6-b043-4118-a1c7-0c581d01ffd3) + +![image](https://github.com/KEGustafsson/signalk-mqtt-gw/assets/3332251/445fdd5e-9277-4bab-ada0-b58bd02242ed) + +![image](https://github.com/KEGustafsson/signalk-mqtt-gw/assets/3332251/20b3ad30-1e48-4b4f-962f-5f64f70bd7e8) If you run a local server you can send data to the server like so: diff --git a/index.js b/index.js index a4a5e6d..7b47159 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,7 @@ /* * Copyright 2016 Teppo Kurki - * + * Copyright 2024 Karl-Erik Gustafsson + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,22 +16,106 @@ */ const id = 'signalk-mqtt-gw'; -const debug = require('debug')(id); -const mosca = require('mosca'); const mqtt = require('mqtt'); -const NeDBStore = require('mqtt-nedb-store'); +const { Manager } = require("mqtt-jsonl-store"); -module.exports = function(app) { - var plugin = { +module.exports = function createPlugin(app) { + let plugin = { unsubscribes: [], }; - var server - plugin.id = id; plugin.name = 'Signal K - MQTT Gateway'; - plugin.description = - 'plugin that provides gateway functionality between Signal K and MQTT'; + plugin.description = 'Plugin that provides gateway functionality between Signal K and MQTT'; + + let server; + let aedes; + let ad; + let client; + let manager; + const setStatus = app.setPluginStatus || app.setProviderStatus; + let localServerMessage = 'not running'; + let remoteServerMessage = 'not running'; + + plugin.start = function (options) { + app.debug("Aedes MQTT Plugin Started"); + statusUpdate(); + plugin.onStop = []; + + if (options.runLocalServer) { + startLocalServer(options, plugin.onStop); + localServerMessage = 'running on port ' + options.port; + statusUpdate(); + } + if (options.sendToRemote) { + manager = new Manager(app.getDataDirPath()); + startMqttClient(manager,plugin.onStop); + } + async function startMqttClient(manager) { + await manager.open(); + client = mqtt.connect(options.remoteHost, { + rejectUnauthorized: options.rejectUnauthorized, + reconnectPeriod: 60000, + clientId: app.selfId, + incomingStore: manager.incoming, + outgoingStore: manager.outgoing, + username: options.username, + password: options.password + }); + client.on('error', (err) => console.error(err)) + + let deltaHandler = undefined; + if (options.selectedOption === '1) vessels.self') { + deltaHandler = (delta) => publishRemoteDelta(delta, client, false) + remoteServerMessage = 'vessels.self to ' + options.remoteHost; + } + else if (options.selectedOption === '2) all deltas') { + deltaHandler = (delta) => publishRemoteDelta(delta, client, true) + remoteServerMessage = 'all deltas to ' + options.remoteHost; + } + else if (options.selectedOption === '3) self paths in JSON format') { + startSending(options, client, plugin.onStop); + remoteServerMessage = 'JSON to ' + options.remoteHost; + } + else if (options.selectedOption === '4) all deltas + JSON') { + startSending(options, client, plugin.onStop); + deltaHandler = (delta) => publishRemoteDelta(delta, client, true); + remoteServerMessage = 'all deltas and JSON to ' + options.remoteHost; + } + if (deltaHandler) { + app.signalk.on('delta', deltaHandler); + } + + statusUpdate(); + + plugin.onStop.push(_ => { + client.end() + stopManager() + deltaHandler && app.signalk.removeListener('delta', deltaHandler); + }); + } + }; + + async function stopManager() { + try { + await manager.close(); + app.debug('manager closed') + } catch (error) {} + } + + plugin.stop = function stop() { + plugin.onStop.forEach(f => f()); + plugin.onStop = []; + if (server) { + server.close(); + aedes.close(); + if (ad) { + ad.stop(); + } + } + app.debug("Aedes MQTT Plugin Stopped"); + }; + plugin.schema = { title: 'Signal K - MQTT Gateway', type: 'object', @@ -71,9 +156,16 @@ module.exports = function(app) { default: false, title: "Reject self signed and invalid server certificates" }, + selectedOption: { + type: "string", + title: "Data to send to remote server", + enum: ["1) vessels.self", "2) all deltas", "3) self paths in JSON format", "4) all deltas + JSON"], + description: 'Select the type of data to send to the remote server', + default: "1) vessels.self" + }, paths: { type: 'array', - title: 'Signal K self paths to send', + title: 'Signal K self paths to send (JSON format), selection 3) or 4) above', default: [{ path: 'navigation.position', interval: 60 }], items: { type: 'object', @@ -93,39 +185,15 @@ module.exports = function(app) { }, }; - var started = false; - var ad; - - plugin.onStop = []; - - plugin.start = function(options) { - plugin.onStop = []; - - if (options.runLocalServer) { - startLocalServer(options, plugin.onStop); - } - if (options.sendToRemote) { - const manager = NeDBStore(app.getDataDirPath()); - const client = mqtt.connect(options.remoteHost, { - rejectUnauthorized: options.rejectUnauthorized, - reconnectPeriod: 60000, - clientId: app.selfId, - outgoingStore: manager.outgoing, - username: options.username, - password: options.password - }); - client.on('error', (err) => console.error(err)) - startSending(options, client, plugin.onStop); - plugin.onStop.push(_ => client.end()); - } - started = true; - }; - - plugin.stop = function() { - plugin.onStop.forEach(f => f()); - }; + function outputMessages() { + setImmediate(() => + app.reportOutputMessages() + ) + } - return plugin; + function statusUpdate () { + setStatus(`Broker: ${localServerMessage}, Client: ${remoteServerMessage}`) + } function startSending(options, client, onStop) { options.paths.forEach(pathInterval => { @@ -133,7 +201,8 @@ module.exports = function(app) { app.streambundle .getSelfBus(pathInterval.path) .debounceImmediate(pathInterval.interval * 1000) - .onValue(normalizedPathValue => + .onValue(normalizedPathValue => { + outputMessages(); client.publish( 'signalk/delta', JSON.stringify({ @@ -153,22 +222,57 @@ module.exports = function(app) { }), { qos: 1 } ) - ) + }) ); }); } + function publishRemoteDelta(delta, client, allDelta) { + if (allDelta) { + publishDelta(delta, client); + } else { + if (delta.context === app.selfContext) { + publishDelta(delta, client); + } + } + } + + function publishDelta(delta, client) { + const prefix = + (delta.context === app.selfContext + ? 'vessels/self' + : delta.context.replace('.', '/')) + '/'; + (delta.updates || []).forEach(update => { + (update.values || []).forEach(pathValue => { + client.publish( + prefix + pathValue.path.replace(/\./g, '/'), + pathValue.value === null ? 'null' : toText(pathValue.value), + { qos: 1 } + ) + }); + }); + outputMessages(); + } + function startLocalServer(options, onStop) { - server = new mosca.Server(options); + aedes = require('aedes')(); + server = require('net').createServer(aedes.handle) + const port = options.port || 1883; + + server.listen(port, function() { + app.debug('Aedes MQTT server is up and running on port', port) + onReady() + }) app.signalk.on('delta', publishLocalDelta); onStop.push(_ => { app.signalk.removeListener('delta', publishLocalDelta) }); - server.on('clientConnected', function(client) { - console.log('client connected', client.id); + aedes.on('client', function(client) { + app.debug('client connected', client.id); }); - server.on('published', function(packet, client) { + aedes.on('publish', async function(packet, client) { + app.debug('Published', packet.topic, packet.payload.toString()); if (client) { var skData = extractSkData(packet); if (skData.valid) { @@ -177,23 +281,25 @@ module.exports = function(app) { } }); - server.on('ready', onReady); - // server.on('error', (err) => { - // app.error(err) - // }) - function onReady() { try { const mdns = require('mdns'); ad = mdns.createAdvertisement(mdns.tcp('mqtt'), options.port); ad.start(); + app.debug( + 'MQTT server is advertised on mDNS as mqtt.tcp://:' + options.port + ); } catch (e) { console.error(e.message); } - console.log( - 'Mosca MQTT server is up and running on port ' + options.port - ); - onStop.push(_ => { server.close() }); + + onStop.push(_ => { + server.close() + aedes.close() + if (ad) { + ad.stop(); + } + }); } } @@ -204,7 +310,7 @@ module.exports = function(app) { : delta.context.replace('.', '/')) + '/'; (delta.updates || []).forEach(update => { (update.values || []).forEach(pathValue => { - server.publish({ + aedes.publish({ topic: prefix + pathValue.path.replace(/\./g, '/'), payload: pathValue.value === null ? 'null' : toText(pathValue.value), @@ -216,10 +322,12 @@ module.exports = function(app) { } function toText(value) { - if (typeof value === 'object') { - return JSON.stringify(value) + if (typeof value !== 'undefined') { + if (typeof value === 'object') { + return JSON.stringify(value) + } + return value.toString() } - return value.toString() } function extractSkData(packet) { @@ -259,4 +367,6 @@ module.exports = function(app) { ], }; } + + return plugin; }; diff --git a/package.json b/package.json index 008a4bc..9e21796 100644 --- a/package.json +++ b/package.json @@ -12,12 +12,12 @@ "author": "teppo.kurki@iki,fi", "license": "ISC", "dependencies": { + "aedes": "^0.51.0", "debug": "^2.3.2", - "mosca": "^2.2.0", - "mqtt-nedb-store": "^0.1.0", - "jsonschema": "1.2.6" + "mqtt": "^5.3.5", + "mqtt-jsonl-store": "^0.2.1" }, "optionalDependencies": { - "mdns": "^2.3.3" + "mdns": "^2.7.2" } }