Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure all pub/sub MQTT operations use QoS 2 #99

Merged
merged 4 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nodes/project-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ module.exports = function (RED) {
}
/** @type {MQTT.IClientSubscribeOptions} */
const subOptions = Object.assign({}, options)
subOptions.qos = subOptions.qos == null ? 1 : subOptions.qos
subOptions.qos = subOptions.qos == null ? 2 : subOptions.qos
subOptions.properties = Object.assign({}, options.properties)
subOptions.properties.userProperties = subOptions.properties.userProperties || {}
subOptions.properties.userProperties._projectID = RED.settings.flowforge.projectID || ''
Expand Down Expand Up @@ -524,7 +524,7 @@ module.exports = function (RED) {
}
/** @type {MQTT.IClientPublishOptions} */
const pubOptions = Object.assign({}, options)
pubOptions.qos = pubOptions.qos == null ? 1 : pubOptions.qos
pubOptions.qos = pubOptions.qos == null ? 2 : pubOptions.qos
pubOptions.properties = Object.assign({}, options.properties)
pubOptions.properties.userProperties = pubOptions.properties.userProperties || {}
pubOptions.properties.userProperties._projectID = RED.settings.flowforge.projectID || ''
Expand Down
242 changes: 184 additions & 58 deletions test/unit/nodes/project-link_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,78 +7,90 @@ const projectLinkPackage = require('../../../nodes/project-link.js')
const { HttpProxyAgent } = require('http-proxy-agent')
const { HttpsProxyAgent } = require('https-proxy-agent')

const TEAM_ID = 'ABCD5678'
const PROJECT_ID = '1234-5678-9012-9876abcdef12'
describe('project-link node', function () {
afterEach(function () {
delete process.env.http_proxy
delete process.env.https_proxy
sinon.restore()
})

describe('proxy', function () {
function setup (httpProxy, httpsProxy, allProxy, noProxy, forgeUrl, mqttUrl) {
const mqttConnectStub = sinon.stub(MQTT, 'connect').returns({
on: sinon.fake(),
subscribe: sinon.fake(),
unsubscribe: sinon.fake(),
publish: sinon.fake(),
removeAllListeners: sinon.fake(),
end: sinon.fake()
})
process.env.http_proxy = httpProxy || ''
process.env.https_proxy = httpsProxy || ''
process.env.all_proxy = allProxy || ''
process.env.no_proxy = noProxy || ''
const nodes = {}
const RED = {
settings: {
flowforge: {
forgeURL: forgeUrl || 'https://local.testfuse.com',
projectID: '1234',
teamID: '5678',
projectLink: {
featureEnabled: true,
broker: {
url: mqttUrl || 'ws://localhost',
clientId: 'nr-project-link'
}
function setup (httpProxy, httpsProxy, allProxy, noProxy, forgeUrl, mqttUrl) {
const mqttStub = {
on: sinon.fake(),
subscribe: sinon.fake(),
unsubscribe: sinon.fake(),
publish: sinon.fake(function (topic, message, options, callback) {
const pkt = { topic, message, options }
callback(null, pkt)
}),
removeAllListeners: sinon.fake(),
end: sinon.fake()
}
const mqttConnectStub = sinon.stub(MQTT, 'connect').returns(mqttStub)
process.env.http_proxy = httpProxy || ''
process.env.https_proxy = httpsProxy || ''
process.env.all_proxy = allProxy || ''
process.env.no_proxy = noProxy || ''
const nodes = {}
const RED = {
settings: {
flowforge: {
forgeURL: forgeUrl || 'https://local.testfuse.com',
projectID: PROJECT_ID,
teamID: TEAM_ID,
projectLink: {
featureEnabled: true,
broker: {
url: mqttUrl || 'ws://localhost',
clientId: 'nr-project-link'
}
}
}
},
log: {
error: sinon.fake(() => { console.error(...arguments) }),
debug: sinon.fake(() => { console.debug(...arguments) }),
log: sinon.fake(() => { console.log(...arguments) }),
warn: sinon.fake(() => { console.warn(...arguments) }),
info: sinon.fake(() => { console.info(...arguments) }),
trace: sinon.fake(() => { console.trace(...arguments) })
},
nodes: {
createNode: function (node, config) {
return node
},
log: {
error: sinon.fake(() => { console.error(...arguments) }),
debug: sinon.fake(() => { console.debug(...arguments) }),
log: sinon.fake(() => { console.log(...arguments) }),
warn: sinon.fake(() => { console.warn(...arguments) }),
info: sinon.fake(() => { console.info(...arguments) }),
trace: sinon.fake(() => { console.trace(...arguments) })
},
nodes: {
createNode: function (node, config) {
return node
},
// RED.nodes.registerType('project link in', ProjectLinkInNode, {
registerType: sinon.fake((name, NodeConstructor, options) => {
nodes[name] = {
name,
NodeConstructor,
options
}
})
},
httpAdmin: {
get: sinon.stub()
},
auth: {
needsPermission: sinon.stub()
// RED.nodes.registerType('project link in', ProjectLinkInNode, {
registerType: sinon.fake((name, NodeConstructor, options) => {
nodes[name] = {
name,
NodeConstructor,
options
}
})
},
httpAdmin: {
get: sinon.stub()
},
auth: {
needsPermission: sinon.stub()
},
util: {
cloneMessage (msg) {
return JSON.parse(JSON.stringify(msg))
}
}
return {
RED,
nodes,
mqttConnectStub
}
}
return {
RED,
nodes,
mqttStub,
mqttConnectStub
}
}

describe('proxy', function () {
afterEach(function () {
sinon.restore()
})
Expand Down Expand Up @@ -255,4 +267,118 @@ describe('project-link node', function () {
})
})
})
describe('Nodes', function () {
it('project link in should subscribe using QoS 2', function () {
const env = setup()
const inNode = {
on: sinon.fake(),
type: 'project link in'
}
const topic = 'cloud/project-nodes-test/xxx'
const RED = env.RED
projectLinkPackage(RED)
const NodeConstructor = env.nodes['project link in'].NodeConstructor
NodeConstructor.call(inNode, { topic, project: PROJECT_ID })

env.mqttStub.subscribe.calledOnce.should.be.true()
should(env.mqttStub.subscribe.args[0][0]).equal(`ff/v1/${TEAM_ID}/p/${PROJECT_ID}/in/${topic}`)
const options = env.mqttStub.subscribe.args[0][1]
should(options).be.an.Object()
options.should.have.property('qos').and.equal(2)
options.should.have.property('properties').and.be.an.Object()
options.properties.should.have.property('subscriptionIdentifier').and.be.a.Number()
})
it('project link call should publish and subscribe using QoS 2', async function () {
const env = setup()
const RED = env.RED
const nodeEvents = {}
const callNode = {
on: (event, cb) => {
nodeEvents[event] = cb
},
error: sinon.fake(),
type: 'project link call'
}
const topic = 'cloud/project-nodes-test/call'
const expectedPubTopic = `ff/v1/${TEAM_ID}/p/${PROJECT_ID}/in/${topic}`
const expectedSubTopic = `ff/v1/${TEAM_ID}/p/${PROJECT_ID}/res/${topic}`
projectLinkPackage(RED)
const NodeConstructor = env.nodes['project link call'].NodeConstructor
NodeConstructor.call(callNode, { topic, project: PROJECT_ID })
callNode.should.have.property('topic', expectedPubTopic)
callNode.should.have.property('responseTopic', expectedSubTopic)
env.mqttStub.subscribe.calledOnce.should.be.true()
should(env.mqttStub.subscribe.args[0][0]).equal(expectedSubTopic)
const options = env.mqttStub.subscribe.args[0][1]
should(options).be.an.Object()
options.should.have.property('qos').and.equal(2)

// send a message to the node so that it can publish
await nodeEvents.input({ payload: 'test' }, sinon.fake(), sinon.fake())

// ensure qos 2 on the publish
env.mqttStub.publish.calledOnce.should.be.true()
const pubTopic = env.mqttStub.publish.args[0][0]
should(pubTopic).equal(expectedPubTopic)
const pubMessageStr = env.mqttStub.publish.args[0][1]
const pubMessage = JSON.parse(pubMessageStr)
should(pubMessage).be.an.Object()
pubMessage.should.have.property('payload').and.equal('test')
pubMessage.should.have.property('projectLink').and.be.an.Object()
pubMessage.projectLink.should.have.property('callStack').and.be.an.Array()
pubMessage.projectLink.callStack.should.have.length(1)
pubMessage.projectLink.callStack[0].should.have.property('topic').and.equal(topic)
pubMessage.projectLink.callStack[0].should.have.property('ts').and.be.a.Number()
pubMessage.projectLink.callStack[0].should.have.property('response').and.equal('res')
pubMessage.projectLink.callStack[0].should.have.property('application')
pubMessage.projectLink.callStack[0].should.have.property('instance')
pubMessage.projectLink.callStack[0].should.have.property('node')
pubMessage.projectLink.callStack[0].should.have.property('project').and.equal(PROJECT_ID)
pubMessage.projectLink.callStack[0].should.have.property('eventId').and.be.a.String()

const pubOptions = env.mqttStub.publish.args[0][2]
should(pubOptions).be.an.Object()
pubOptions.should.have.property('qos').and.equal(2)
})
it('project link out should publish using QoS 2', async function () {
const env = setup()
const RED = env.RED
const nodeEvents = {}
const outNode = {
on: (event, cb) => {
nodeEvents[event] = cb
},
error: sinon.fake(),
type: 'project link call',
mode: 'link'
}
const topic = 'cloud/project-nodes-test/xxx'
const expectedPubTopic = `ff/v1/${TEAM_ID}/p/${PROJECT_ID}/in/${topic}`
projectLinkPackage(RED)
const NodeConstructor = env.nodes['project link out'].NodeConstructor
NodeConstructor.call(outNode, { topic, project: PROJECT_ID })
outNode.should.have.property('subTopic', topic)

// send a message to the node so that it can publish
await nodeEvents.input({ payload: 'test' }, sinon.fake(), sinon.fake())

env.mqttStub.publish.calledOnce.should.be.true()
const pubTopic = env.mqttStub.publish.args[0][0]
should(pubTopic).equal(expectedPubTopic)
const pubMessageStr = env.mqttStub.publish.args[0][1]
const pubMessage = JSON.parse(pubMessageStr)
should(pubMessage).be.an.Object()
pubMessage.should.have.property('payload').and.equal('test')
const pubOptions = env.mqttStub.publish.args[0][2]
should(pubOptions).be.an.Object()
pubOptions.should.have.property('qos').and.equal(2)
pubOptions.should.have.property('properties').and.be.an.Object()
pubOptions.properties.should.have.property('contentType', 'application/json')
pubOptions.properties.should.have.property('userProperties').and.be.an.Object()
pubOptions.properties.userProperties.should.have.property('_nodeID')
pubOptions.properties.userProperties.should.have.property('_projectID', PROJECT_ID)
pubOptions.properties.userProperties.should.have.property('_applicationID')
pubOptions.properties.userProperties.should.have.property('_publishTime').and.be.a.Number()
})
})
})