Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: better qos support #323

Merged
merged 6 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions src/adapters/mqtt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ interface IMQTTHeaders {
length: number,
}

const MQTT_UNSPECIFIED_ERROR_REASON = 0x80
const MQTT_SUCCESS_REASON = 0

class MqttAdapter extends Adapter {
private client: MqttClient
private firstConnect: boolean
Expand All @@ -30,7 +33,8 @@ class MqttAdapter extends Adapter {
_connect(): Promise<this> {
return new Promise((resolve) => {
const subscribedChannels = this.getSubscribedChannels()
const serverBinding = this.AsyncAPIServer.binding('mqtt')
const mqttServerBinding = this.AsyncAPIServer.binding('mqtt')
const mqtt5ServerBinding = this.AsyncAPIServer.binding('mqtt5')
const securityRequirements = (this.AsyncAPIServer.security() || []).map(sec => {
const secName = Object.keys(sec.json())[0]
return this.parsedAsyncAPI.components().securityScheme(secName)
Expand All @@ -42,12 +46,18 @@ class MqttAdapter extends Adapter {
const certsConfig = process.env.GLEE_SERVER_CERTS?.split(',').map(t => t.split(':'))
const certs = certsConfig?.filter(tuple => tuple[0] === this.serverName)?.map(t => fs.readFileSync(t[1])) // eslint-disable-line security/detect-non-literal-fs-filename

const serverBinding = mqttServerBinding
const protocolVersion = mqtt5ServerBinding ? 5 : 4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should get protocolVersion from the server object instead:

Suggested change
const serverBinding = mqttServerBinding
const protocolVersion = mqtt5ServerBinding ? 5 : 4
const protocolVersion = parseInt(this.AsyncAPIServer.protocolVersion() || 4)
const serverBinding = protocolVersion === 5 ? mqtt5ServerBinding : mqttServerBinding

Just because the mqtt5 exists in the AsyncAPI document it doesn't mean it should be used.


this.client = mqtt.connect({
host: url.host,
port: url.port || (url.protocol === 'mqtt:' ? 1883 : 8883),
protocol: url.protocol.substr(0, url.protocol.length - 1),
clientId: serverBinding && serverBinding.clientId,
clean: serverBinding && serverBinding.cleanSession,
properties: {
sessionExpiryInterval: serverBinding && serverBinding.sessionExpiryInterval
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MQTT bindings (mqtt and mqtt5) don't have a sessionExpiryInterval property. We have to evaluate if that's something the MQTT5 binding should have, otherwise, we can't make a reference to it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Session expiry interval is specified in the MQTT spec. I think we should add it. It's required to have persistent sessions, and utilize QoS in MQTT.

},
will: serverBinding && serverBinding.will && {
topic: serverBinding && serverBinding.lastWill && serverBinding.lastWill.topic ? serverBinding.lastWill.topic : undefined,
qos: serverBinding && serverBinding.lastWill && serverBinding.lastWill.qos ? serverBinding.lastWill.qos : undefined,
Expand All @@ -58,15 +68,19 @@ class MqttAdapter extends Adapter {
username: userAndPasswordSecurityReq ? process.env.GLEE_USERNAME : undefined,
password: userAndPasswordSecurityReq ? process.env.GLEE_PASSWORD : undefined,
ca: X509SecurityReq ? certs : undefined,
protocolVersion,
customHandleAcks: this._customAckHandler.bind(this),
})

this.client.on('connect', () => {
this.client.on('connect', connAckPacket => {
const isSessionResume = connAckPacket.sessionPresent

if (!this.firstConnect) {
this.firstConnect = true
this.emit('connect', { name: this.name(), adapter: this, connection: this.client, channels: this.channelNames })
}

if (Array.isArray(subscribedChannels)) {
if (!isSessionResume && Array.isArray(subscribedChannels)) {
subscribedChannels.forEach((channel) => {
const operation = this.parsedAsyncAPI.channel(channel).publish()
const binding = operation.binding('mqtt')
Expand All @@ -80,6 +94,9 @@ class MqttAdapter extends Adapter {
})

this.client.on('message', (channel, message, mqttPacket) => {
const qos = mqttPacket.qos
if ( protocolVersion === 5 && qos > 0 ) return // ignore higher qos messages. already processed

const msg = this._createMessage(mqttPacket as IPublishPacket)
this.emit('message', msg, this.client)
})
Expand Down Expand Up @@ -137,6 +154,15 @@ class MqttAdapter extends Adapter {
channel: packet.topic,
})
}

_customAckHandler(channel, message, mqttPacket, done) {
const msg = this._createMessage(mqttPacket as IPublishPacket)

msg.on('successfullyProcessed', () => done(MQTT_SUCCESS_REASON))
msg.on('failedProcessing', () => done(MQTT_UNSPECIFIED_ERROR_REASON))

this.emit('message', msg, this.client)
}
}

export default MqttAdapter
3 changes: 3 additions & 0 deletions src/lib/glee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ export default class Glee extends EventEmitter {

async.seq(...mws)(message, (err: Error, msg: GleeMessage) => {
if (err) {
message.notifyFailedProcessing()
debug('Error encountered while processing middlewares.')
this._processError(errorMiddlewares, err, msg)
return
}
Expand All @@ -238,6 +240,7 @@ export default class Glee extends EventEmitter {
}
})
} else {
message.notifySuccessfulProcessing()
debug('Inbound pipeline finished.')
}
})
Expand Down
14 changes: 14 additions & 0 deletions src/lib/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,20 @@ class GleeMessage extends EventEmitter {
send() {
this.emit('send', this)
}

/**
* Indicates successfully processed the message
*/
notifySuccessfulProcessing() {
this.emit('successfullyProcessed')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's follow the structure we're using everywhere else in the code base:

Suggested change
this.emit('successfullyProcessed')
this.emit('processing:successful')

}

/**
* Indicates failure in processing the message
*/
notifyFailedProcessing() {
this.emit('failedProcessing')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.emit('failedProcessing')
this.emit('processing:failed')

}
}

export default GleeMessage