Skip to content

Commit

Permalink
Merge pull request #55 from FlowFuse/3018-support-app-assigned-devices
Browse files Browse the repository at this point in the history
Add support for communicating with app assigned devices
  • Loading branch information
knolleary authored Nov 22, 2023
2 parents 824f194 + 0d0fa85 commit 929dc4c
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 33 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ A collection of Node-RED nodes for easy communication between Node-RED instances
running in the [FlowFuse platform](https://flowfuse.com).

These nodes act in a similar way to the core Node-RED Link nodes - but can be
used to send and receive messages between different Node-RED instances.
used to send and receive messages between different Node-RED instances and devices.

Whilst these nodes are published under the Apache-2.0 license, they can only be
used with an instance of the FlowFuse platform with an active EE license applied.
Expand All @@ -15,6 +15,7 @@ This can be safely ignored.
### Prerequisites

- FlowFuse 0.8+ running with an active EE license and its integrated MQTT Broker
- FlowFuse 1.14+ for communicating with application assigned devices

Alternatively, you can [sign up to FlowFuse Cloud](https://flowfuse.com/product/)
now to try these nodes out.
Expand Down
10 changes: 5 additions & 5 deletions nodes/project-link.html
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<label><span>&nbsp</span></label>
<span class="ff-project-link-group-option">
<input type="radio" id="ff-project-link-radio-input-broadcast" name="ff-project-link-broadcast" value="true" />
<label for="ff-project-link-radio-input-broadcast">Broadcast message to all instances</label>
<label for="ff-project-link-radio-input-broadcast">Broadcast message</label>
</span>
</div>
<div class="form-row ff-project-link-topic-row">
Expand Down Expand Up @@ -219,7 +219,7 @@
// broadcast not permitted in link call at this time but has been
// considered in the code base - possible future iteration
if (nodeType === 'project link in') {
el.append(new Option('all instances', 'all', false, val === 'all'))
el.append(new Option('all instances and devices', 'all', false, val === 'all'))
}
const projects = (data.count ? data.projects : null) || []
for (let index = 0; index < projects.length; index++) {
Expand Down Expand Up @@ -366,7 +366,7 @@
<script type="text/html" data-help-name="project link in">
<p>Receive messages from other Node-RED instances within your FlowFuse Team</p>
<h3>Details</h3>
<p>This node can either listen for messages broadcast by other instances,
<p>This node can either listen for messages broadcast by other instances and devices,
or listen for messages sent directly to this instance.</p>
<p>The node is configured with a <code>topic</code> to listen on. This works
like an MQTT topic - allowing projects to send messages targeting different
Expand Down Expand Up @@ -404,12 +404,12 @@ <h3>Details</h3>
It provides three modes of operation:
<ul>
<li>send messages to another instance</li>
<li>broadcast messages to any instance listening on the same topic</li>
<li>broadcast messages to any instance or device listening on the same topic</li>
<li>return the message to its sender if it originated from a Project Call node</li>
</ul>
</p>
<p>
When configured to send or broadcast messages to other instances, the node
When configured to send or broadcast messages, the node
is configured with a <code>topic</code> to send on. This works
like an MQTT topic - allowing instances to send messages targeting different
subscribers.
Expand Down
94 changes: 67 additions & 27 deletions nodes/project-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module.exports = function (RED) {
'use strict'

// Do not register nodes in runtime if settings not provided
if (!RED.settings.flowforge || !RED.settings.flowforge.projectID || !RED.settings.flowforge.teamID || !RED.settings.flowforge.projectLink) {
if (!RED.settings.flowforge || !(RED.settings.flowforge.projectID || RED.settings.flowforge.applicationID) || !RED.settings.flowforge.teamID || !RED.settings.flowforge.projectLink) {
throw new Error('Project Link nodes cannot be loaded outside of an FlowFuse EE environment')
}

Expand All @@ -16,6 +16,7 @@ module.exports = function (RED) {
const API_VERSION = 'v1'
const TOPIC_HEADER = 'ff'
const TOPIC_VERSION = 'v1'
const OWNER_TYPE = RED.settings.flowforge.applicationID ? 'application' : 'instance'

// #region JSDoc

Expand Down Expand Up @@ -133,40 +134,53 @@ module.exports = function (RED) {
return result
}

function buildLinkTopic (node, project, subTopic, broadcast, responseTopic) {
function buildLinkTopic (node, projectOrDeviceId, subTopic, broadcast, responseTopic) {
// ↓ Useful for debugging ↓
// console.log(`🔗 buildLinkTopic: for ${OWNER_TYPE} ${node?.ownId || ''} ${projectOrDeviceId} ${subTopic} ${broadcast} ${responseTopic}`)
const topicParts = [TOPIC_HEADER, TOPIC_VERSION, RED.settings.flowforge.teamID]
if (!node || node.type === 'project link call') {
topicParts.push('p')
topicParts.push(project)
topicParts.push(projectOrDeviceId)
if (responseTopic) {
topicParts.push(responseTopic)
} else {
topicParts.push('in')
}
} else if (node.type === 'project link in') {
topicParts.push('p')
if (broadcast && project === 'all') {
if (broadcast && projectOrDeviceId === 'all') { // Listen for broadcast messages from all projects
topicParts.push('+')
topicParts.push('out')
} else if (broadcast) {
topicParts.push(project)
// e.g. SUB ff/v1/7N152GxG2p/p/+/out/a/b
} else if (broadcast) { // Listen for broadcast messages from a specific project
topicParts.push(projectOrDeviceId)
topicParts.push('out')
} else { // self
topicParts.push(RED.settings.flowforge.projectID)
// e.g. SUB ff/v1/7N152GxG2p/p/SOURCE-PROJ-ID-aa97-8915e1897326/out/a/b
} else { // Receive messages sent to this instance
topicParts.push(node.ownId)
topicParts.push('in')
// e.g. SUB ff/v1/7N152GxG2p/p/PROJECT-OWN-ID-aa97-8915e1897326/in/a/b
}
} else if (node.type === 'project link out') {
topicParts.push('p')
if (broadcast) {
topicParts.push(RED.settings.flowforge.projectID)
// publish to all (broadcast)
topicParts.push(node.ownId)
topicParts.push('out')
// e.g. PUB topic ff/v1/7N152GxG2p/p/PROJECT-OWN-ID-aa97-8915e1897326/out/a/b
// e.g. PUB ff/v1/7N152GxG2p/p/dev:<device-id>/out/a/b
} else {
topicParts.push(project)
// publish to a specific project
topicParts.push(projectOrDeviceId)
topicParts.push('in')
// e.g. PUB ff/v1/7N152GxG2p/p/TARGET-PROJ-ID-aa97-8915e1897326/in/a/b
// e.g. PUB ff/v1/7N152GxG2p/p/dev:<device-id>/in/a/b
}
}
topicParts.push(subTopic)
const topic = topicParts.join('/')
// ↓ Useful for debugging ↓
// console.log(`🔗 buildLinkTopic created topic: ${topic}`)
return topic
}
// #endregion Helpers
Expand Down Expand Up @@ -226,14 +240,15 @@ module.exports = function (RED) {
msg = JSON.parse(message.toString(), jsonReviver)
msg.projectLink = {
...msg.projectLink,
instanceId: packet.properties?.userProperties?._projectID,
projectId: packet.properties?.userProperties?._projectID,
instanceId: packet.properties?.userProperties?._projectID || '',
projectId: packet.properties?.userProperties?._projectID || '',
applicationId: packet.properties?.userProperties?._applicationID || '',
topic: topic.split('/').slice(6).join('/')
}
if (packet.properties?.userProperties?._deviceId) {
msg.projectLink.deviceId = packet.properties?.userProperties?._deviceId
msg.projectLink.deviceName = packet.properties?.userProperties?._deviceName
msg.projectLink.deviceType = packet.properties?.userProperties?._deviceType
msg.projectLink.deviceName = packet.properties?.userProperties?._deviceName || ''
msg.projectLink.deviceType = packet.properties?.userProperties?._deviceType || ''
}
} catch (error) {
err = error
Expand Down Expand Up @@ -395,7 +410,11 @@ module.exports = function (RED) {
subOptions.qos = subOptions.qos == null ? 1 : subOptions.qos
subOptions.properties = Object.assign({}, options.properties)
subOptions.properties.userProperties = subOptions.properties.userProperties || {}
subOptions.properties.userProperties._projectID = RED.settings.flowforge.projectID
subOptions.properties.userProperties._projectID = RED.settings.flowforge.projectID || ''
subOptions.properties.userProperties._applicationID = RED.settings.flowforge.applicationID || ''
if (OWNER_TYPE === 'application') {
subOptions.properties.userProperties._deviceID = process.env.FF_DEVICE_ID || ''
}
subOptions.properties.userProperties._nodeID = node.id
subOptions.properties.userProperties._ts = Date.now()
if (subID) {
Expand Down Expand Up @@ -464,11 +483,12 @@ module.exports = function (RED) {
pubOptions.qos = pubOptions.qos == null ? 1 : pubOptions.qos
pubOptions.properties = Object.assign({}, options.properties)
pubOptions.properties.userProperties = pubOptions.properties.userProperties || {}
pubOptions.properties.userProperties._projectID = RED.settings.flowforge.projectID
pubOptions.properties.userProperties._projectID = RED.settings.flowforge.projectID || ''
pubOptions.properties.userProperties._applicationID = RED.settings.flowforge.applicationID || ''
if (process.env.FF_DEVICE_ID) {
pubOptions.properties.userProperties._deviceId = process.env.FF_DEVICE_ID
pubOptions.properties.userProperties._deviceName = process.env.FF_DEVICE_NAME
pubOptions.properties.userProperties._deviceType = process.env.FF_DEVICE_TYPE
pubOptions.properties.userProperties._deviceName = process.env.FF_DEVICE_NAME || ''
pubOptions.properties.userProperties._deviceType = process.env.FF_DEVICE_TYPE || ''
}
pubOptions.properties.userProperties._nodeID = node.id
pubOptions.properties.userProperties._publishTime = Date.now()
Expand Down Expand Up @@ -511,7 +531,8 @@ module.exports = function (RED) {
requestResponseInformation: true,
requestProblemInformation: true,
userProperties: {
project: RED.settings.flowforge.projectID || ''
project: RED.settings.flowforge.projectID || '',
application: RED.settings.flowforge.applicationID || ''
}
}
}
Expand Down Expand Up @@ -654,6 +675,7 @@ module.exports = function (RED) {
function ProjectLinkInNode (n) {
RED.nodes.createNode(this, n)
const node = this
node.ownId = OWNER_TYPE === 'application' ? 'dev:' + process.env.FF_DEVICE_ID : RED.settings.flowforge.projectID
node.project = n.project
node.subscriptionIdentifier = (n.broadcast && n.project === 'all') ? 2 : 1
node.subTopic = n.topic
Expand All @@ -677,15 +699,17 @@ module.exports = function (RED) {
}
node.receive(msg)
}
// to my inbox
// * this project in ff/v1/7N152GxG2p/p/ca65f5ed-aea0-4a10-ac9a-2086b6af6700/in/b1/b1
// to my inbox (direct to device not supported, only direct to an instance is currently supported)
// * this project in ff/v1/7N152GxG2p/p/ca65f5ed-aea0-4a10-ac9a-2086b6af6700/in/b1/b1 sub proj→prog
// broadcasts
// * specific project out ff/v1/7N152GxG2p/p/ca65f5ed-aea0-4a10-ac9a-2086b6af6700/out/b1/b1 sub broadcast
// * +any project+ out ff/v1/7N152GxG2p/p/+/out/b1/b1 sub broadcast
// * specific project out ff/v1/7N152GxG2p/p/ca65f5ed-aea0-4a10-ac9a-2086b6af6700/out/b1/b1 sub broadcast
// * +any project/device+ out ff/v1/7N152GxG2p/p/+/out/b1/b1 sub broadcast
let subscribedTopic = node.topic
if (RED.settings.flowforge.useSharedSubscriptions) {
subscribedTopic = `$share/${RED.settings.flowforge.projectID}/${node.topic}`
}
// ↓ Useful for debugging ↓
// console.log(`🔗 LINK-IN SUB ${subscribedTopic}`)
mqtt.subscribe(node, subscribedTopic, { qos: 2 }, onSub)
.then(_result => {})
.catch(err => {
Expand Down Expand Up @@ -717,6 +741,7 @@ module.exports = function (RED) {
function ProjectLinkOutNode (n) {
RED.nodes.createNode(this, n)
const node = this
node.ownId = OWNER_TYPE === 'application' ? 'dev:' + process.env.FF_DEVICE_ID : RED.settings.flowforge.projectID
node.project = n.project
node.subTopic = n.topic
node.mode = n.mode || 'link'
Expand All @@ -729,11 +754,14 @@ module.exports = function (RED) {
if (msg.projectLink?.callStack?.length > 0) {
/** @type {MessageEvent} */
const messageEvent = msg.projectLink.callStack.pop()
if (messageEvent && messageEvent.project && messageEvent.topic && messageEvent.eventId) {
const responseTopic = buildLinkTopic(null, messageEvent.project, messageEvent.topic, node.broadcast, messageEvent.response || 'res')
const targetId = messageEvent.project || `dev:${messageEvent.device}`
if (messageEvent && targetId && messageEvent.topic && messageEvent.eventId) {
const responseTopic = buildLinkTopic(null, targetId, messageEvent.topic, node.broadcast, messageEvent.response || 'res')
const properties = {
correlationData: messageEvent.eventId
}
// ↓ Useful for debugging ↓
// console.log(`🔗 LINK-OUT RETURN PUB ${responseTopic}`)
await mqtt.publish(node, responseTopic, msg, { properties })
} else {
node.warn('Project Link Source not valid')
Expand All @@ -744,7 +772,8 @@ module.exports = function (RED) {
done()
} else if (node.mode === 'link') {
const topic = buildLinkTopic(node, node.project, node.subTopic, node.broadcast)
// console.log(`PUB ${topic}`)
// ↓ Useful for debugging ↓
// console.log(`🔗 LINK-OUT PUB ${topic}`)
await mqtt.publish(node, topic, msg)
done()
}
Expand All @@ -770,6 +799,7 @@ module.exports = function (RED) {
function ProjectLinkCallNode (n) {
RED.nodes.createNode(this, n)
const node = this
node.ownId = OWNER_TYPE === 'application' ? 'dev:' + process.env.FF_DEVICE_ID : RED.settings.flowforge.projectID
node.project = n.project
node.subTopic = n.topic
node.topic = buildLinkTopic(node, node.project, node.subTopic, false)
Expand All @@ -778,7 +808,8 @@ module.exports = function (RED) {
} else {
node.responseTopicPrefix = 'res'
}
node.responseTopic = buildLinkTopic(node, RED.settings.flowforge.projectID, node.subTopic, false, node.responseTopicPrefix)
node.responseTopic = buildLinkTopic(node, node.ownId, node.subTopic, false, node.responseTopicPrefix)
// node.responseTopic = buildLinkTopic(node, RED.settings.flowforge.projectID, node.subTopic, false, node.responseTopicPrefix)
let timeout = parseFloat(n.timeout || '30') * 1000
if (isNaN(timeout)) {
timeout = 30000
Expand All @@ -805,6 +836,8 @@ module.exports = function (RED) {

mqtt.connect()
mqtt.registerStatus(node)
// ↓ Useful for debugging ↓
// console.log(`🔗 LINK-CALL responseTopic SUB ${node.responseTopic}`)
mqtt.subscribe(node, node.responseTopic, { qos: 2 }, onSub)
.then(_result => {})
.catch(err => {
Expand All @@ -820,10 +853,15 @@ module.exports = function (RED) {
eventId,
node: node.id,
project: RED.settings.flowforge.projectID,
instance: RED.settings.flowforge.instanceID,
application: RED.settings.flowforge.applicationID,
topic: node.subTopic,
response: node.responseTopicPrefix,
ts: Date.now()
}
if (process.env.FF_DEVICE_ID) {
messageEvent.device = process.env.FF_DEVICE_ID
}
/** @type {MessageEvents} */
messageEvents[eventId] = {
...messageEvent,
Expand Down Expand Up @@ -857,6 +895,8 @@ module.exports = function (RED) {
correlationData: eventId
}
}
// ↓ Useful for debugging ↓
// console.log(`🔗 LINK-CALL PUB ${node.topic}`)
await mqtt.publish(node, node.topic, msg, options)
} catch (error) {
done(error)
Expand Down

0 comments on commit 929dc4c

Please sign in to comment.