Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize subscribe handler #316

Merged
merged 1 commit into from
Sep 2, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 56 additions & 64 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,28 @@ var topicActions = fastfall([
subTopic
])

function SubAck (packet, granted) {
this.cmd = 'suback'
this.messageId = packet.messageId
this.granted = granted
}

function Subscription (qos, func) {
this.qos = qos
this.func = func
}

function SubscribeState (client, packet, finish, granted) {
this.client = client
this.packet = packet
this.finish = finish
this.subState = []
}

function SubState (client, packet, granted) {
this.client = client
this.packet = packet
this.granted = granted
this.subsIndex = 0
}

// if same subscribed topic in subs array, we pick up the last one
Expand All @@ -34,28 +50,26 @@ function _dedupe (subs) {
}

function handleSubscribe (client, packet, done) {
var broker = client.broker
var subs = packet.subscriptions
var granted = []

broker._series(
new SubscribeState(client, packet, done, granted),
client.broker._parallel(
new SubscribeState(client, packet, done),
doSubscribe,
subs.length === 1 ? subs : _dedupe(subs),
completeSubscribe)
}

function doSubscribe (sub, done) {
// TODO this function should not be needed
topicActions.call(this, sub, done)
var s = new SubState(this.client, this.packet, sub.qos)
this.subState.push(s)
topicActions.call(s, sub, done)
}

function authorize (sub, done) {
var client = this.client
var err = validateTopic(sub.topic, 'SUBSCRIBE')
if (err) {
return done(err)
}
var client = this.client
client.broker.authorizeSubscribe(client, sub, done)
}

Expand All @@ -70,72 +84,52 @@ function blockDollarSignTopics (func) {
}
}

function Subscription (qos, func) {
this.qos = qos
this.func = func
}

function storeSubscriptions (sub, done) {
var packet = this.packet
var client = this.client
var broker = client.broker
var perst = broker.persistence

if (!sub) {
this.granted.push(128)
if (!sub || typeof sub !== 'object') {
return done(null, null)
}

if (packet.subscriptions.length > 0 && ++this.subsIndex < packet.subscriptions.length) {
// TODO change aedes subscribe handle, but this fix bugs for now.
return done(null, sub)
}
var packet = this.packet

if (packet.restore) {
return done(null, sub)
}

var client = this.client

if (client.clean) {
return done(null, sub)
}

perst.addSubscriptions(client, packet.subscriptions, function addSub (err) {
client.broker.persistence.addSubscriptions(client, packet.subscriptions, function addSub (err) {
done(err, sub)
})
}

function subTopic (sub, done) {
var client = this.client
var broker = client.broker
var func = nop

if (!sub) {
if (!sub || typeof sub !== 'object') {
this.granted = 128
return done()
}

switch (sub.qos) {
case 2:
case 1:
func = client.deliverQoS
break
default:
func = client.deliver0
break
}
var client = this.client
var broker = client.broker
var topic = sub.topic
var qos = sub.qos
var func = qos > 0 ? client.deliverQoS : client.deliver0

// [MQTT-4.7.2-1]
if (isStartsWithWildcard(sub.topic)) {
if (isStartsWithWildcard(topic)) {
func = blockDollarSignTopics(func)
}

this.granted.push(sub.qos)

if (!client.subscriptions[sub.topic]) {
client.subscriptions[sub.topic] = new Subscription(sub.qos, func)
broker.subscribe(sub.topic, func, done)
} else if (client.subscriptions[sub.topic].qos !== sub.qos) {
broker.unsubscribe(sub.topic, client.subscriptions[sub.topic].func)
client.subscriptions[sub.topic] = new Subscription(sub.qos, func)
broker.subscribe(sub.topic, func, done)
if (!client.subscriptions[topic]) {
client.subscriptions[topic] = new Subscription(qos, func)
broker.subscribe(topic, func, done)
} else if (client.subscriptions[topic].qos !== qos) {
broker.unsubscribe(topic, client.subscriptions[topic].func)
client.subscriptions[topic] = new Subscription(qos, func)
broker.subscribe(topic, func, done)
} else {
done()
}
Expand All @@ -149,35 +143,39 @@ function isStartsWithWildcard (topic) {
}

function completeSubscribe (err) {
var packet = this.packet
var client = this.client
var broker = client.broker
var granted = this.granted
var done = this.finish

if (err) {
return done(err)
}

var packet = this.packet
var client = this.client
var broker = client.broker
var granted = this.subState.map(obj => obj.granted)
this.subState = []

var subs = packet.subscriptions

if (!packet.restore) {
broker.emit('subscribe', packet.subscriptions, client)
broker.emit('subscribe', subs, client)
}

if (packet.messageId) {
write(client, new SubAck(packet, granted), nop)
}

// negated subscription check
if (this.granted && this.granted[0] === 128) {
if (granted[0] === 128) {
return done()
} else {
done()
}

var persistence = broker.persistence
var topics = []
for (var i = 0; i < packet.subscriptions.length; i++) {
topics.push(packet.subscriptions[i].topic)
for (var i = 0; i < subs.length; i++) {
topics.push(subs[i].topic)
}
var stream = persistence.createRetainedStreamCombi(topics)
stream.pipe(through.obj(function sendRetained (packet, enc, cb) {
Expand All @@ -194,12 +192,6 @@ function completeSubscribe (err) {
}))
}

function SubAck (packet, granted) {
this.cmd = 'suback'
this.messageId = packet.messageId
this.granted = granted
}

function nop () {}

module.exports = handleSubscribe