From 29e5ddfd0c1689ff063b2f117063fe42fea8ee33 Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Mon, 2 Sep 2019 17:34:24 +0800 Subject: [PATCH] Parallelize subscribe handler --- lib/handlers/subscribe.js | 120 ++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 64 deletions(-) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 64ab0073..91849da8 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -11,12 +11,28 @@ var topicActions = fastfall([ subTopic ]) +function SubAck (packet, granted) { + this.cmd = 'suback' + this.messageId = packet.messageId + this.granted = granted +} + +function Subscription (qos, func) { + this.qos = qos + this.func = func +} + function SubscribeState (client, packet, finish, granted) { this.client = client this.packet = packet this.finish = finish + this.subState = [] +} + +function SubState (client, packet, granted) { + this.client = client + this.packet = packet this.granted = granted - this.subsIndex = 0 } // if same subscribed topic in subs array, we pick up the last one @@ -34,28 +50,26 @@ function _dedupe (subs) { } function handleSubscribe (client, packet, done) { - var broker = client.broker var subs = packet.subscriptions - var granted = [] - - broker._series( - new SubscribeState(client, packet, done, granted), + client.broker._parallel( + new SubscribeState(client, packet, done), doSubscribe, subs.length === 1 ? subs : _dedupe(subs), completeSubscribe) } function doSubscribe (sub, done) { - // TODO this function should not be needed - topicActions.call(this, sub, done) + var s = new SubState(this.client, this.packet, sub.qos) + this.subState.push(s) + topicActions.call(s, sub, done) } function authorize (sub, done) { - var client = this.client var err = validateTopic(sub.topic, 'SUBSCRIBE') if (err) { return done(err) } + var client = this.client client.broker.authorizeSubscribe(client, sub, done) } @@ -70,72 +84,52 @@ function blockDollarSignTopics (func) { } } -function Subscription (qos, func) { - this.qos = qos - this.func = func -} - function storeSubscriptions (sub, done) { - var packet = this.packet - var client = this.client - var broker = client.broker - var perst = broker.persistence - - if (!sub) { - this.granted.push(128) + if (!sub || typeof sub !== 'object') { + return done(null, null) } - if (packet.subscriptions.length > 0 && ++this.subsIndex < packet.subscriptions.length) { - // TODO change aedes subscribe handle, but this fix bugs for now. - return done(null, sub) - } + var packet = this.packet if (packet.restore) { return done(null, sub) } + var client = this.client + if (client.clean) { return done(null, sub) } - perst.addSubscriptions(client, packet.subscriptions, function addSub (err) { + client.broker.persistence.addSubscriptions(client, packet.subscriptions, function addSub (err) { done(err, sub) }) } function subTopic (sub, done) { - var client = this.client - var broker = client.broker - var func = nop - - if (!sub) { + if (!sub || typeof sub !== 'object') { + this.granted = 128 return done() } - switch (sub.qos) { - case 2: - case 1: - func = client.deliverQoS - break - default: - func = client.deliver0 - break - } + var client = this.client + var broker = client.broker + var topic = sub.topic + var qos = sub.qos + var func = qos > 0 ? client.deliverQoS : client.deliver0 // [MQTT-4.7.2-1] - if (isStartsWithWildcard(sub.topic)) { + if (isStartsWithWildcard(topic)) { func = blockDollarSignTopics(func) } - this.granted.push(sub.qos) - - if (!client.subscriptions[sub.topic]) { - client.subscriptions[sub.topic] = new Subscription(sub.qos, func) - broker.subscribe(sub.topic, func, done) - } else if (client.subscriptions[sub.topic].qos !== sub.qos) { - broker.unsubscribe(sub.topic, client.subscriptions[sub.topic].func) - client.subscriptions[sub.topic] = new Subscription(sub.qos, func) - broker.subscribe(sub.topic, func, done) + if (!client.subscriptions[topic]) { + client.subscriptions[topic] = new Subscription(qos, func) + broker.subscribe(topic, func, done) + } else if (client.subscriptions[topic].qos !== qos) { + broker.unsubscribe(topic, client.subscriptions[topic].func) + client.subscriptions[topic] = new Subscription(qos, func) + broker.subscribe(topic, func, done) } else { done() } @@ -149,18 +143,22 @@ function isStartsWithWildcard (topic) { } function completeSubscribe (err) { - var packet = this.packet - var client = this.client - var broker = client.broker - var granted = this.granted var done = this.finish if (err) { return done(err) } + var packet = this.packet + var client = this.client + var broker = client.broker + var granted = this.subState.map(obj => obj.granted) + this.subState = [] + + var subs = packet.subscriptions + if (!packet.restore) { - broker.emit('subscribe', packet.subscriptions, client) + broker.emit('subscribe', subs, client) } if (packet.messageId) { @@ -168,7 +166,7 @@ function completeSubscribe (err) { } // negated subscription check - if (this.granted && this.granted[0] === 128) { + if (granted[0] === 128) { return done() } else { done() @@ -176,8 +174,8 @@ function completeSubscribe (err) { var persistence = broker.persistence var topics = [] - for (var i = 0; i < packet.subscriptions.length; i++) { - topics.push(packet.subscriptions[i].topic) + for (var i = 0; i < subs.length; i++) { + topics.push(subs[i].topic) } var stream = persistence.createRetainedStreamCombi(topics) stream.pipe(through.obj(function sendRetained (packet, enc, cb) { @@ -194,12 +192,6 @@ function completeSubscribe (err) { })) } -function SubAck (packet, granted) { - this.cmd = 'suback' - this.messageId = packet.messageId - this.granted = granted -} - function nop () {} module.exports = handleSubscribe