Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: better qos support #323

Merged
merged 6 commits into from
Jan 25, 2023

Conversation

sudoshreyansh
Copy link
Contributor

Description

  • inbound QoS > 0 messages now get processed before sending the ACK response.
  • support for persistent sessions in MQTT using the sessionExpiryInterval property of the MQTT server binding.

MQTT versions before 5.0 cannot be supported due to the limitations in the protocol and the MQTT.js client library.

Related issue(s)
See also #27

@sonarqubecloud
Copy link

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 2 Code Smells

No Coverage information No Coverage information
0.0% 0.0% Duplication

@coveralls
Copy link

coveralls commented Jul 28, 2022

Pull Request Test Coverage Report for Build 4003324759

Warning: This coverage report may be inaccurate.

This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.

Details

  • 3 of 5 (60.0%) changed or added relevant lines in 2 files are covered.
  • 1 unchanged line in 1 file lost coverage.
  • Overall coverage decreased (-0.02%) to 61.89%

Changes Missing Coverage Covered Lines Changed/Added Lines %
src/lib/glee.ts 2 3 66.67%
src/lib/message.ts 1 2 50.0%
Files with Coverage Reduction New Missed Lines %
src/lib/glee.ts 1 90.12%
Totals Coverage Status
Change from base Build 3988887327: -0.02%
Covered Lines: 319
Relevant Lines: 441

💛 - Coveralls

Copy link
Member

@fmvilas fmvilas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a bunch of comments. Great start! 🚀

@@ -48,6 +48,9 @@ class MqttAdapter extends Adapter {
protocol: url.protocol.substr(0, url.protocol.length - 1),
clientId: serverBinding && serverBinding.clientId,
clean: serverBinding && serverBinding.cleanSession,
properties: {
sessionExpiryInterval: serverBinding && serverBinding.sessionExpiryInterval
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MQTT bindings (mqtt and mqtt5) don't have a sessionExpiryInterval property. We have to evaluate if that's something the MQTT5 binding should have, otherwise, we can't make a reference to it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Session expiry interval is specified in the MQTT spec. I think we should add it. It's required to have persistent sessions, and utilize QoS in MQTT.

@@ -58,15 +61,19 @@ class MqttAdapter extends Adapter {
username: userAndPasswordSecurityReq ? process.env.GLEE_USERNAME : undefined,
password: userAndPasswordSecurityReq ? process.env.GLEE_PASSWORD : undefined,
ca: X509SecurityReq ? certs : undefined,
protocolVersion: 5,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this, this adapter is always going to use MQTT 5. This must be a conditional value. Maybe based on the bindings. If the mqtt bindings exist then it's version 3.x, if the mqtt5 binding exists then it's version 5. Or maybe a configuration in the glee.config.js file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MQTT specifies 3 for version 3.1 and 4 for version 3.1.1. So for the mqtt binding, which version will be used?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go for version 4 given it's the latest for 3.x (that's really weird 😅)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sure

src/adapters/mqtt/index.ts Outdated Show resolved Hide resolved
src/lib/message.ts Outdated Show resolved Hide resolved
src/lib/message.ts Outdated Show resolved Hide resolved
@sonarqubecloud
Copy link

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

No Coverage information No Coverage information
0.0% 0.0% Duplication

Comment on lines 49 to 50
const serverBinding = mqttServerBinding
const protocolVersion = mqtt5ServerBinding ? 5 : 4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should get protocolVersion from the server object instead:

Suggested change
const serverBinding = mqttServerBinding
const protocolVersion = mqtt5ServerBinding ? 5 : 4
const protocolVersion = parseInt(this.AsyncAPIServer.protocolVersion() || 4)
const serverBinding = protocolVersion === 5 ? mqtt5ServerBinding : mqttServerBinding

Just because the mqtt5 exists in the AsyncAPI document it doesn't mean it should be used.

* Indicates successfully processed the message
*/
notifySuccessfulProcessing() {
this.emit('successfullyProcessed')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's follow the structure we're using everywhere else in the code base:

Suggested change
this.emit('successfullyProcessed')
this.emit('processing:successful')

* Indicates failure in processing the message
*/
notifyFailedProcessing() {
this.emit('failedProcessing')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.emit('failedProcessing')
this.emit('processing:failed')

@fmvilas
Copy link
Member

fmvilas commented Jan 23, 2023

@sudoshreyansh you've got some conflicts. It looks good to me already but you need to solve them before we can merge.

@sonarqubecloud
Copy link

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

No Coverage information No Coverage information
0.0% 0.0% Duplication

@sudoshreyansh sudoshreyansh requested a review from fmvilas January 25, 2023 06:11
@sudoshreyansh
Copy link
Contributor Author

@fmvilas I have fixed the conflicts. Can you please re-review?

Copy link
Member

@fmvilas fmvilas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 LET'S GOOOO!

@fmvilas
Copy link
Member

fmvilas commented Jan 25, 2023

/rtm

@asyncapi-bot asyncapi-bot merged commit d07d74e into asyncapi:master Jan 25, 2023
@fmvilas
Copy link
Member

fmvilas commented Jan 25, 2023

Congratulations on finishing your mentorship program, @sudoshreyansh 🎉 💰 😄

@asyncapi-bot
Copy link
Contributor

🎉 This PR is included in version 0.16.0 🎉

The release is available on:

Your semantic-release bot 📦🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants