Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Project Nodes multiple MQTT connections issue #66

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 83 additions & 26 deletions nodes/project-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,10 @@ module.exports = function (RED) {
// mqtt encapsulation
const mqtt = (function () {
const allNodes = new Set()
/** @type {MQTT.MqttClient} */
/** @type {MQTT.MqttClient} The Single MQTT client connection */
let client
/** @type {MQTT.MqttClient[]} An array to track and auto kill multiple client connections */
const clients = []
let connected = false
let connecting = false
let closing = false
Expand Down Expand Up @@ -274,6 +276,7 @@ module.exports = function (RED) {
})
}
function onConnect (/** @type {MQTT.IConnackPacket} */ packet) {
checkAndContain() // check and contain multiple clients
connAck.properties = packet.properties
connAck.reasonCode = packet.reasonCode
connAck.returnCode = packet.returnCode
Expand All @@ -288,7 +291,8 @@ module.exports = function (RED) {
})
}
function onReconnect () {
RED.log.trace('Project Link nodes reconnecting')
RED.log.info('Project Link nodes reconnecting')
checkAndContain() // check and contain multiple clients
allNodes.forEach(node => {
try {
node.status({ fill: 'yellow', shape: 'dot', text: 'reconnecting' })
Expand All @@ -305,6 +309,7 @@ module.exports = function (RED) {
reasonCode: rc,
reasonString: rs
}
checkAndContain() // check and contain multiple clients
connected = false
connecting = false
closing = false
Expand All @@ -317,6 +322,7 @@ module.exports = function (RED) {
}
// Register disconnect handlers
function onClose (err) {
checkAndContain() // check and contain multiple clients
if (err instanceof Error) {
RED.log.trace(`Project link connection closed: ${err.message}`)
allNodes.forEach(node => {
Expand Down Expand Up @@ -376,10 +382,43 @@ module.exports = function (RED) {
clientListeners = clientListeners.filter((l) => {
if (event && event !== l.event) { return true }
if (handler && handler !== l.handler) { return true }
client.removeListener(l.event, l.handler)
client && client.removeListener(l.event, l.handler)
return false // found and removed, filter out this one
})
}

/**
* Check all clients and closes any that are NOT the current client (i.e. keeps a max of 1 client open at any time)
* If the current client is closed/undefined, then any/all connections found in the clients array will be closed
* Lastly, the clients array is re-synced to contain only the current client (if it exists)
*/
function checkAndContain () {
// There should only ever be one client, but due to async nature of
// user operations, node-red on(...) events, mqtt auto reconnect,
// network outages and other timing difficulties, it has proven possible to generate multiple clients.
// Therefore, whenever we create a connection, we add it to the clients array (in the `connect()` function)
// That way, we can proactively check all clients and close any that are not the current client
// This is a low cost operation and will only be called when a connect/reconnect/disconnect callback is made
for (let i = 0; i < clients.length; i++) {
if (clients[i] && clients[i] !== client) {
if (client) {
// if we have a client, but this is not it, log the fact we caught a non-current client
RED.log.warn('Project link nodes: cleaning up non current client')
}
try {
clients[i].removeAllListeners()
clients[i].end(true)
} finally {
clients[i] = null
}
}
}
// re-sync the clients array if required
clients.length = 0
if (client) {
clients.push(client)
}
}
return { // public interface
subscribe (node, topic, options, callback) {
if (!isValidSubscriptionTopic(topic)) {
Expand Down Expand Up @@ -526,6 +565,17 @@ module.exports = function (RED) {
connecting = true
off() // close existing event handlers to be safe from duplicates (re-wired after connection issued)

if (client) {
// if client is something, force it to end unconditionally
RED.log.trace('force end() project node client before new connect')
try {
client.removeAllListeners()
client.end(true)
} finally {
client = null
}
}

/** @type {MQTT.IClientOptions} */
const defaultOptions = {
protocolVersion: 5,
Expand Down Expand Up @@ -576,6 +626,7 @@ module.exports = function (RED) {
const newURL = new URL(brokerURL)
parsedURL.hostname = newURL.hostname
client = MQTT.connect(parsedURL, options)
clients.push(client) // add to clients array for containment and auto cleanup of multiple clients
on('connect', onConnect)
on('error', onError)
on('close', onClose)
Expand All @@ -588,52 +639,57 @@ module.exports = function (RED) {
}
},
disconnect (done) {
const closeMessage = null // FUTURE: Let broker/clients know of issue via close msg
const _callback = function (err) {
connecting = false
connected = false
closing = false
if (err) {
RED.log.warn(`Project Link nodes disconnect error: ${err.message}`)
}
// By this point, the client will mostly likely have been ended cleanly
// however, there is no harm in forcing it to end here and so far, this
// solves the majority of multiple connect/disconnect issues witnessed.
if (client) {
try {
client.removeAllListeners()
client.end(true) // force end, most likely already ended (cleanly)
} catch (_err) {
// do nothing
}
}
done && typeof done === 'function' && done(err)
}
if (!client) { return _callback() }

const waitEnd = (client, ms) => {
return new Promise((resolve, reject) => {
closing = true
if (!client || !connected) {
if (!client) {
resolve()
} else {
const t = setTimeout(() => {
if (!connected) {
resolve()
} else {
// clean end() has exceeded WAIT_END, lets force end!
client && client.end(true)
reject(new Error('timeout'))
reject(new Error('timeout waiting for client clean end'))
}
}, ms)
client.end(() => {
client.end((err) => {
clearTimeout(t)
resolve()
if (err) {
reject(err)
} else {
resolve()
}
})
}
})
}
if (connected && closeMessage) {
mqtt.publish(closeMessage, function (err) {
waitEnd(client, 2000).then(() => {
_callback(err)
}).catch((e) => {
_callback(e)
})
})
} else {
waitEnd(client, 2000).then(() => {
_callback()
}).catch((_e) => {
_callback()
})
}

waitEnd(client, 2000).then(() => {
_callback()
}).catch((_e) => {
_callback(_e)
})
},
close (done) {
topicCallbackMap.forEach(callbacks => {
Expand All @@ -646,6 +702,7 @@ module.exports = function (RED) {
mqtt.disconnect((err) => {
off()
client = null
checkAndContain()
done(err)
})
},
Expand Down
Loading