Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for communicating with app assigned devices #55

Merged
merged 4 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ A collection of Node-RED nodes for easy communication between Node-RED instances
running in the [FlowFuse platform](https://flowfuse.com).

These nodes act in a similar way to the core Node-RED Link nodes - but can be
used to send and receive messages between different Node-RED instances.
used to send and receive messages between different Node-RED instances and devices.

Whilst these nodes are published under the Apache-2.0 license, they can only be
used with an instance of the FlowFuse platform with an active EE license applied.
Expand All @@ -15,6 +15,7 @@ This can be safely ignored.
### Prerequisites

- FlowFuse 0.8+ running with an active EE license and its integrated MQTT Broker
- FlowFuse 1.14+ for communicating with application assigned devices

Alternatively, you can [sign up to FlowFuse Cloud](https://flowfuse.com/product/)
now to try these nodes out.
Expand Down
10 changes: 5 additions & 5 deletions nodes/project-link.html
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<label><span>&nbsp</span></label>
<span class="ff-project-link-group-option">
<input type="radio" id="ff-project-link-radio-input-broadcast" name="ff-project-link-broadcast" value="true" />
<label for="ff-project-link-radio-input-broadcast">Broadcast message to all instances</label>
<label for="ff-project-link-radio-input-broadcast">Broadcast message</label>
</span>
</div>
<div class="form-row ff-project-link-topic-row">
Expand Down Expand Up @@ -219,7 +219,7 @@
// broadcast not permitted in link call at this time but has been
// considered in the code base - possible future iteration
if (nodeType === 'project link in') {
el.append(new Option('all instances', 'all', false, val === 'all'))
el.append(new Option('all instances and devices', 'all', false, val === 'all'))
}
const projects = (data.count ? data.projects : null) || []
for (let index = 0; index < projects.length; index++) {
Expand Down Expand Up @@ -366,7 +366,7 @@
<script type="text/html" data-help-name="project link in">
<p>Receive messages from other Node-RED instances within your FlowFuse Team</p>
<h3>Details</h3>
<p>This node can either listen for messages broadcast by other instances,
<p>This node can either listen for messages broadcast by other instances and devices,
or listen for messages sent directly to this instance.</p>
<p>The node is configured with a <code>topic</code> to listen on. This works
like an MQTT topic - allowing projects to send messages targeting different
Expand Down Expand Up @@ -404,12 +404,12 @@ <h3>Details</h3>
It provides three modes of operation:
<ul>
<li>send messages to another instance</li>
<li>broadcast messages to any instance listening on the same topic</li>
<li>broadcast messages to any instance or device listening on the same topic</li>
<li>return the message to its sender if it originated from a Project Call node</li>
</ul>
</p>
<p>
When configured to send or broadcast messages to other instances, the node
When configured to send or broadcast messages, the node
is configured with a <code>topic</code> to send on. This works
like an MQTT topic - allowing instances to send messages targeting different
subscribers.
Expand Down
94 changes: 67 additions & 27 deletions nodes/project-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module.exports = function (RED) {
'use strict'

// Do not register nodes in runtime if settings not provided
if (!RED.settings.flowforge || !RED.settings.flowforge.projectID || !RED.settings.flowforge.teamID || !RED.settings.flowforge.projectLink) {
if (!RED.settings.flowforge || !(RED.settings.flowforge.projectID || RED.settings.flowforge.applicationID) || !RED.settings.flowforge.teamID || !RED.settings.flowforge.projectLink) {
throw new Error('Project Link nodes cannot be loaded outside of an FlowFuse EE environment')
}

Expand All @@ -16,6 +16,7 @@ module.exports = function (RED) {
const API_VERSION = 'v1'
const TOPIC_HEADER = 'ff'
const TOPIC_VERSION = 'v1'
const OWNER_TYPE = RED.settings.flowforge.applicationID ? 'application' : 'instance'

// #region JSDoc

Expand Down Expand Up @@ -133,40 +134,53 @@ module.exports = function (RED) {
return result
}

function buildLinkTopic (node, project, subTopic, broadcast, responseTopic) {
function buildLinkTopic (node, projectOrDeviceId, subTopic, broadcast, responseTopic) {
// ↓ Useful for debugging ↓
// console.log(`🔗 buildLinkTopic: for ${OWNER_TYPE} ${node?.ownId || ''} ${projectOrDeviceId} ${subTopic} ${broadcast} ${responseTopic}`)
const topicParts = [TOPIC_HEADER, TOPIC_VERSION, RED.settings.flowforge.teamID]
if (!node || node.type === 'project link call') {
topicParts.push('p')
topicParts.push(project)
topicParts.push(projectOrDeviceId)
if (responseTopic) {
topicParts.push(responseTopic)
} else {
topicParts.push('in')
}
} else if (node.type === 'project link in') {
topicParts.push('p')
if (broadcast && project === 'all') {
if (broadcast && projectOrDeviceId === 'all') { // Listen for broadcast messages from all projects
topicParts.push('+')
topicParts.push('out')
} else if (broadcast) {
topicParts.push(project)
// e.g. SUB ff/v1/7N152GxG2p/p/+/out/a/b
} else if (broadcast) { // Listen for broadcast messages from a specific project
topicParts.push(projectOrDeviceId)
topicParts.push('out')
} else { // self
topicParts.push(RED.settings.flowforge.projectID)
// e.g. SUB ff/v1/7N152GxG2p/p/SOURCE-PROJ-ID-aa97-8915e1897326/out/a/b
} else { // Receive messages sent to this instance
topicParts.push(node.ownId)
topicParts.push('in')
// e.g. SUB ff/v1/7N152GxG2p/p/PROJECT-OWN-ID-aa97-8915e1897326/in/a/b
}
} else if (node.type === 'project link out') {
topicParts.push('p')
if (broadcast) {
topicParts.push(RED.settings.flowforge.projectID)
// publish to all (broadcast)
topicParts.push(node.ownId)
topicParts.push('out')
// e.g. PUB topic ff/v1/7N152GxG2p/p/PROJECT-OWN-ID-aa97-8915e1897326/out/a/b
// e.g. PUB ff/v1/7N152GxG2p/p/dev:<device-id>/out/a/b
} else {
topicParts.push(project)
// publish to a specific project
topicParts.push(projectOrDeviceId)
topicParts.push('in')
// e.g. PUB ff/v1/7N152GxG2p/p/TARGET-PROJ-ID-aa97-8915e1897326/in/a/b
// e.g. PUB ff/v1/7N152GxG2p/p/dev:<device-id>/in/a/b
}
}
topicParts.push(subTopic)
const topic = topicParts.join('/')
// ↓ Useful for debugging ↓
// console.log(`🔗 buildLinkTopic created topic: ${topic}`)
return topic
}
// #endregion Helpers
Expand Down Expand Up @@ -226,14 +240,15 @@ module.exports = function (RED) {
msg = JSON.parse(message.toString(), jsonReviver)
msg.projectLink = {
...msg.projectLink,
instanceId: packet.properties?.userProperties?._projectID,
projectId: packet.properties?.userProperties?._projectID,
instanceId: packet.properties?.userProperties?._projectID || '',
projectId: packet.properties?.userProperties?._projectID || '',
applicationId: packet.properties?.userProperties?._applicationID || '',
topic: topic.split('/').slice(6).join('/')
}
if (packet.properties?.userProperties?._deviceId) {
msg.projectLink.deviceId = packet.properties?.userProperties?._deviceId
msg.projectLink.deviceName = packet.properties?.userProperties?._deviceName
msg.projectLink.deviceType = packet.properties?.userProperties?._deviceType
msg.projectLink.deviceName = packet.properties?.userProperties?._deviceName || ''
msg.projectLink.deviceType = packet.properties?.userProperties?._deviceType || ''
}
} catch (error) {
err = error
Expand Down Expand Up @@ -395,7 +410,11 @@ module.exports = function (RED) {
subOptions.qos = subOptions.qos == null ? 1 : subOptions.qos
subOptions.properties = Object.assign({}, options.properties)
subOptions.properties.userProperties = subOptions.properties.userProperties || {}
subOptions.properties.userProperties._projectID = RED.settings.flowforge.projectID
subOptions.properties.userProperties._projectID = RED.settings.flowforge.projectID || ''
subOptions.properties.userProperties._applicationID = RED.settings.flowforge.applicationID || ''
if (OWNER_TYPE === 'application') {
subOptions.properties.userProperties._deviceID = process.env.FF_DEVICE_ID || ''
}
subOptions.properties.userProperties._nodeID = node.id
subOptions.properties.userProperties._ts = Date.now()
if (subID) {
Expand Down Expand Up @@ -464,11 +483,12 @@ module.exports = function (RED) {
pubOptions.qos = pubOptions.qos == null ? 1 : pubOptions.qos
pubOptions.properties = Object.assign({}, options.properties)
pubOptions.properties.userProperties = pubOptions.properties.userProperties || {}
pubOptions.properties.userProperties._projectID = RED.settings.flowforge.projectID
pubOptions.properties.userProperties._projectID = RED.settings.flowforge.projectID || ''
pubOptions.properties.userProperties._applicationID = RED.settings.flowforge.applicationID || ''
if (process.env.FF_DEVICE_ID) {
pubOptions.properties.userProperties._deviceId = process.env.FF_DEVICE_ID
pubOptions.properties.userProperties._deviceName = process.env.FF_DEVICE_NAME
pubOptions.properties.userProperties._deviceType = process.env.FF_DEVICE_TYPE
pubOptions.properties.userProperties._deviceName = process.env.FF_DEVICE_NAME || ''
pubOptions.properties.userProperties._deviceType = process.env.FF_DEVICE_TYPE || ''
}
pubOptions.properties.userProperties._nodeID = node.id
pubOptions.properties.userProperties._publishTime = Date.now()
Expand Down Expand Up @@ -511,7 +531,8 @@ module.exports = function (RED) {
requestResponseInformation: true,
requestProblemInformation: true,
userProperties: {
project: RED.settings.flowforge.projectID || ''
project: RED.settings.flowforge.projectID || '',
application: RED.settings.flowforge.applicationID || ''
}
}
}
Expand Down Expand Up @@ -654,6 +675,7 @@ module.exports = function (RED) {
function ProjectLinkInNode (n) {
RED.nodes.createNode(this, n)
const node = this
node.ownId = OWNER_TYPE === 'application' ? 'dev:' + process.env.FF_DEVICE_ID : RED.settings.flowforge.projectID
node.project = n.project
node.subscriptionIdentifier = (n.broadcast && n.project === 'all') ? 2 : 1
node.subTopic = n.topic
Expand All @@ -677,15 +699,17 @@ module.exports = function (RED) {
}
node.receive(msg)
}
// to my inbox
// * this project in ff/v1/7N152GxG2p/p/ca65f5ed-aea0-4a10-ac9a-2086b6af6700/in/b1/b1
// to my inbox (direct to device not supported, only direct to an instance is currently supported)
// * this project in ff/v1/7N152GxG2p/p/ca65f5ed-aea0-4a10-ac9a-2086b6af6700/in/b1/b1 sub proj→prog
// broadcasts
// * specific project out ff/v1/7N152GxG2p/p/ca65f5ed-aea0-4a10-ac9a-2086b6af6700/out/b1/b1 sub broadcast
// * +any project+ out ff/v1/7N152GxG2p/p/+/out/b1/b1 sub broadcast
// * specific project out ff/v1/7N152GxG2p/p/ca65f5ed-aea0-4a10-ac9a-2086b6af6700/out/b1/b1 sub broadcast
// * +any project/device+ out ff/v1/7N152GxG2p/p/+/out/b1/b1 sub broadcast
let subscribedTopic = node.topic
if (RED.settings.flowforge.useSharedSubscriptions) {
subscribedTopic = `$share/${RED.settings.flowforge.projectID}/${node.topic}`
}
// ↓ Useful for debugging ↓
// console.log(`🔗 LINK-IN SUB ${subscribedTopic}`)
mqtt.subscribe(node, subscribedTopic, { qos: 2 }, onSub)
.then(_result => {})
.catch(err => {
Expand Down Expand Up @@ -717,6 +741,7 @@ module.exports = function (RED) {
function ProjectLinkOutNode (n) {
RED.nodes.createNode(this, n)
const node = this
node.ownId = OWNER_TYPE === 'application' ? 'dev:' + process.env.FF_DEVICE_ID : RED.settings.flowforge.projectID
node.project = n.project
node.subTopic = n.topic
node.mode = n.mode || 'link'
Expand All @@ -729,11 +754,14 @@ module.exports = function (RED) {
if (msg.projectLink?.callStack?.length > 0) {
/** @type {MessageEvent} */
const messageEvent = msg.projectLink.callStack.pop()
if (messageEvent && messageEvent.project && messageEvent.topic && messageEvent.eventId) {
const responseTopic = buildLinkTopic(null, messageEvent.project, messageEvent.topic, node.broadcast, messageEvent.response || 'res')
const targetId = messageEvent.project || `dev:${messageEvent.device}`
if (messageEvent && targetId && messageEvent.topic && messageEvent.eventId) {
const responseTopic = buildLinkTopic(null, targetId, messageEvent.topic, node.broadcast, messageEvent.response || 'res')
const properties = {
correlationData: messageEvent.eventId
}
// ↓ Useful for debugging ↓
// console.log(`🔗 LINK-OUT RETURN PUB ${responseTopic}`)
await mqtt.publish(node, responseTopic, msg, { properties })
} else {
node.warn('Project Link Source not valid')
Expand All @@ -744,7 +772,8 @@ module.exports = function (RED) {
done()
} else if (node.mode === 'link') {
const topic = buildLinkTopic(node, node.project, node.subTopic, node.broadcast)
// console.log(`PUB ${topic}`)
// ↓ Useful for debugging ↓
// console.log(`🔗 LINK-OUT PUB ${topic}`)
await mqtt.publish(node, topic, msg)
done()
}
Expand All @@ -770,6 +799,7 @@ module.exports = function (RED) {
function ProjectLinkCallNode (n) {
RED.nodes.createNode(this, n)
const node = this
node.ownId = OWNER_TYPE === 'application' ? 'dev:' + process.env.FF_DEVICE_ID : RED.settings.flowforge.projectID
node.project = n.project
node.subTopic = n.topic
node.topic = buildLinkTopic(node, node.project, node.subTopic, false)
Expand All @@ -778,7 +808,8 @@ module.exports = function (RED) {
} else {
node.responseTopicPrefix = 'res'
}
node.responseTopic = buildLinkTopic(node, RED.settings.flowforge.projectID, node.subTopic, false, node.responseTopicPrefix)
node.responseTopic = buildLinkTopic(node, node.ownId, node.subTopic, false, node.responseTopicPrefix)
// node.responseTopic = buildLinkTopic(node, RED.settings.flowforge.projectID, node.subTopic, false, node.responseTopicPrefix)
let timeout = parseFloat(n.timeout || '30') * 1000
if (isNaN(timeout)) {
timeout = 30000
Expand All @@ -805,6 +836,8 @@ module.exports = function (RED) {

mqtt.connect()
mqtt.registerStatus(node)
// ↓ Useful for debugging ↓
// console.log(`🔗 LINK-CALL responseTopic SUB ${node.responseTopic}`)
mqtt.subscribe(node, node.responseTopic, { qos: 2 }, onSub)
.then(_result => {})
.catch(err => {
Expand All @@ -820,10 +853,15 @@ module.exports = function (RED) {
eventId,
node: node.id,
project: RED.settings.flowforge.projectID,
instance: RED.settings.flowforge.instanceID,
application: RED.settings.flowforge.applicationID,
topic: node.subTopic,
response: node.responseTopicPrefix,
ts: Date.now()
}
if (process.env.FF_DEVICE_ID) {
messageEvent.device = process.env.FF_DEVICE_ID
}
/** @type {MessageEvents} */
messageEvents[eventId] = {
...messageEvent,
Expand Down Expand Up @@ -857,6 +895,8 @@ module.exports = function (RED) {
correlationData: eventId
}
}
// ↓ Useful for debugging ↓
// console.log(`🔗 LINK-CALL PUB ${node.topic}`)
await mqtt.publish(node, node.topic, msg, options)
} catch (error) {
done(error)
Expand Down