diff --git a/README.md b/README.md
index ae48dd9..1171a77 100644
--- a/README.md
+++ b/README.md
@@ -87,6 +87,7 @@ API
---
* mqtt#generate()
+ * mqtt#writeToStream()
* mqtt#parser()
@@ -96,6 +97,15 @@ Generates a `Buffer` containing an MQTT packet.
The object must be one of the ones specified by the [packets](#packets)
section. Throws an `Error` if a packet cannot be generated.
+
+### mqtt.writeToStream(object, stream)
+
+Writes the mqtt packet defined by `object` to the given stream.
+The object must be one of the ones specified by the [packets](#packets)
+section. Emits an `Error` on the stream if a packet cannot be generated.
+On node >= 12, this function automatically calls `cork()` on your stream,
+and then it calls `uncork()` on the next tick.
+
### mqtt.parser()
diff --git a/benchmarks/generate.js b/benchmarks/generate.js
index 6389cf4..cac2a9d 100644
--- a/benchmarks/generate.js
+++ b/benchmarks/generate.js
@@ -1,6 +1,6 @@
var mqtt = require('../')
- , max = 10000000
+ , max = 100000
, i
, start = Date.now()
, time
diff --git a/benchmarks/generateTick.js b/benchmarks/generateTick.js
new file mode 100644
index 0000000..e4eefc4
--- /dev/null
+++ b/benchmarks/generateTick.js
@@ -0,0 +1,53 @@
+
+var mqtt = require('../')
+ , max = 1000000
+ , i = 0
+ , start = Date.now()
+ , time
+ , buf = new Buffer(10)
+ , net = require('net')
+ , server = net.createServer(handle)
+ , dest
+
+buf.fill('test')
+
+function handle(sock) {
+ sock.resume();
+}
+
+server.listen(0, function() {
+ dest = net.connect(server.address());
+
+ dest.on('connect', tickWait);
+ dest.on('drain', tickWait);
+
+ dest.on('finish', function () {
+ time = Date.now() - start;
+ console.log('Total time', time);
+ console.log('Total packets', max);
+ console.log('Packet/s', max / time * 1000);
+ server.close();
+ });
+});
+
+function tickWait () {
+ //console.log('tickWait', i)
+ var res = true
+ //var toSend = new Buffer(5 + buf.length)
+
+ for (; i < max && res; i++) {
+ res = dest.write(mqtt.generate({
+ cmd: 'publish'
+ , topic: 'test'
+ , payload: buf
+ }))
+ //buf.copy(toSend, 5)
+ //res = dest.write(toSend, 'buffer')
+ //console.log(res)
+ }
+
+ if (i >= max) {
+ dest.end();
+ return;
+ }
+}
diff --git a/benchmarks/writeToStream.js b/benchmarks/writeToStream.js
new file mode 100644
index 0000000..d595299
--- /dev/null
+++ b/benchmarks/writeToStream.js
@@ -0,0 +1,51 @@
+
+var mqtt = require('../')
+ , max = 1000000
+ , i = 0
+ , start = Date.now()
+ , time
+ , buf = new Buffer(10)
+ , net = require('net')
+ , server = net.createServer(handle)
+ , dest
+
+function handle(sock) {
+ sock.resume();
+}
+
+buf.fill('test')
+
+server.listen(0, function() {
+ dest = net.connect(server.address());
+
+ dest.on('connect', tickWait);
+ dest.on('drain', tickWait);
+
+ dest.on('finish', function () {
+ time = Date.now() - start;
+ console.log('Total time', time);
+ console.log('Total packets', max);
+ console.log('Packet/s', max / time * 1000);
+ server.close();
+ });
+});
+
+function tickWait() {
+ var res = true
+ //var toSend = new Buffer(5)
+
+ for (; i < max && res; i++) {
+ res = mqtt.writeToStream({
+ cmd: 'publish'
+ , topic: 'test'
+ , payload: buf
+ }, dest)
+ //dest.write(toSend, 'buffer')
+ //res = dest.write(buf, 'buffer')
+ }
+
+ if (i >= max) {
+ dest.end();
+ return;
+ }
+}
diff --git a/constants.js b/constants.js
index 04e26fd..bc1cf71 100644
--- a/constants.js
+++ b/constants.js
@@ -1,7 +1,8 @@
/* Protocol - protocol constants */
+var protocol = module.exports;
/* Command code => mnemonic */
-module.exports.types = {
+protocol.types = {
0: 'reserved',
1: 'connect',
2: 'connack',
@@ -21,32 +22,86 @@ module.exports.types = {
};
/* Mnemonic => Command code */
-module.exports.codes = {}
-for(var k in module.exports.types) {
- var v = module.exports.types[k];
- module.exports.codes[v] = k;
+protocol.codes = {}
+for(var k in protocol.types) {
+ var v = protocol.types[k];
+ protocol.codes[v] = k;
}
/* Header */
-module.exports.CMD_SHIFT = 4;
-module.exports.CMD_MASK = 0xF0;
-module.exports.DUP_MASK = 0x08;
-module.exports.QOS_MASK = 0x03;
-module.exports.QOS_SHIFT = 1;
-module.exports.RETAIN_MASK = 0x01;
+protocol.CMD_SHIFT = 4;
+protocol.CMD_MASK = 0xF0;
+protocol.DUP_MASK = 0x08;
+protocol.QOS_MASK = 0x03;
+protocol.QOS_SHIFT = 1;
+protocol.RETAIN_MASK = 0x01;
/* Length */
-module.exports.LENGTH_MASK = 0x7F;
-module.exports.LENGTH_FIN_MASK = 0x80;
+protocol.LENGTH_MASK = 0x7F;
+protocol.LENGTH_FIN_MASK = 0x80;
/* Connack */
-module.exports.SESSIONPRESENT_MASK = 0x01;
+protocol.SESSIONPRESENT_MASK = 0x01;
+protocol.SESSIONPRESENT_HEADER = new Buffer([protocol.SESSIONPRESENT_MASK]);
+protocol.CONNACK_HEADER = new Buffer([protocol.codes['connack'] << protocol.CMD_SHIFT])
/* Connect */
-module.exports.USERNAME_MASK = 0x80;
-module.exports.PASSWORD_MASK = 0x40;
-module.exports.WILL_RETAIN_MASK = 0x20;
-module.exports.WILL_QOS_MASK = 0x18;
-module.exports.WILL_QOS_SHIFT = 3;
-module.exports.WILL_FLAG_MASK = 0x04;
-module.exports.CLEAN_SESSION_MASK = 0x02;
+protocol.USERNAME_MASK = 0x80;
+protocol.PASSWORD_MASK = 0x40;
+protocol.WILL_RETAIN_MASK = 0x20;
+protocol.WILL_QOS_MASK = 0x18;
+protocol.WILL_QOS_SHIFT = 3;
+protocol.WILL_FLAG_MASK = 0x04;
+protocol.CLEAN_SESSION_MASK = 0x02;
+protocol.CONNECT_HEADER = new Buffer([protocol.codes['connect'] << protocol.CMD_SHIFT])
+
+function genHeader (type) {
+ return [0, 1, 2].map(function(qos) {
+ return [0, 1].map(function(dup) {
+ return [0, 1].map(function(retain) {
+ var buf = new Buffer(1)
+ buf.writeUInt8(
+ protocol.codes[type] << protocol.CMD_SHIFT |
+ (dup ? protocol.DUP_MASK : 0 ) |
+ qos << protocol.QOS_SHIFT | retain, 0, true)
+ return buf
+ });
+ });
+ });
+}
+
+/* Publish */
+protocol.PUBLISH_HEADER = genHeader('publish');
+
+/* SUBSCRIBE */
+protocol.SUBSCRIBE_HEADER = genHeader('subscribe');
+
+/* UNSUBSCRIBE */
+protocol.UNSUBSCRIBE_HEADER = genHeader('unsubscribe');
+
+/* Confirmations */
+protocol.ACKS = {
+ unsuback: genHeader('unsuback'),
+ puback: genHeader('puback'),
+ pubcomp: genHeader('pubcomp'),
+ pubrel: genHeader('pubrel'),
+ pubrec: genHeader('pubrec')
+};
+
+protocol.SUBACK_HEADER = new Buffer([protocol.codes['suback'] << protocol.CMD_SHIFT]);
+
+/* Protocol versions */
+protocol.VERSION3 = new Buffer([3])
+protocol.VERSION4 = new Buffer([4])
+
+/* QOS */
+protocol.QOS = [0, 1, 2].map(function(qos) {
+ return new Buffer([qos])
+})
+
+/* empty packets */
+protocol.EMPTY = {
+ pingreq: new Buffer([protocol.codes['pingreq'] << 4, 0]),
+ pingresp: new Buffer([protocol.codes['pingresp'] << 4, 0]),
+ disconnect: new Buffer([protocol.codes['disconnect'] << 4, 0])
+};
diff --git a/generate.js b/generate.js
index 2c4fad5..8a34867 100644
--- a/generate.js
+++ b/generate.js
@@ -1,614 +1,57 @@
-
'use strict';
-var protocol = require('./constants')
- , empty = new Buffer(0)
+var writeToStream = require('./writeToStream')
+ , EE = require('events').EventEmitter
+ , inherits = require('inherits')
function generate(packet) {
+ var stream = new Accumulator()
+ writeToStream(packet, stream)
+ return stream.concat()
+}
- switch (packet.cmd) {
- case 'connect':
- return connect(packet)
- case 'connack':
- return connack(packet)
- case 'publish':
- return publish(packet)
- case 'puback':
- case 'pubrec':
- case 'pubrel':
- case 'pubcomp':
- case 'unsuback':
- return confirmation(packet)
- case 'subscribe':
- return subscribe(packet)
- case 'suback':
- return suback(packet)
- case 'unsubscribe':
- return unsubscribe(packet)
- case 'pingreq':
- case 'pingresp':
- case 'disconnect':
- return emptyPacket(packet)
- default:
- throw new Error('unknown command')
- }
+function Accumulator() {
+ this._array = new Array(20)
+ this._i = 0
}
-function connect(opts) {
- var opts = opts || {}
- , protocolId = opts.protocolId || 'MQTT'
- , protocolVersion = opts.protocolVersion || 4
- , will = opts.will
- , clean = opts.clean
- , keepalive = opts.keepalive || 0
- , clientId = opts.clientId || ""
- , username = opts.username
- , password = opts.password
+inherits(Accumulator, EE)
- if (clean === undefined) {
- clean = true
- }
+Accumulator.prototype.write = function (chunk) {
+ this._array[this._i++] = chunk
+ return true
+};
+Accumulator.prototype.concat = function () {
var length = 0
+ , lengths = new Array(this._array.length)
+ , list = this._array
+ , pos = 0
+ , i
+ , result;
- // Must be a string and non-falsy
- if (!protocolId ||
- (typeof protocolId !== "string" && !Buffer.isBuffer(protocolId))) {
- throw new Error('Invalid protocol id')
- } else {
- length += protocolId.length + 2
- }
-
- // Must be a 1 byte number
- if (!protocolVersion ||
- 'number' !== typeof protocolVersion ||
- protocolVersion > 255 ||
- protocolVersion < 0) {
-
- throw new Error('Invalid protocol version')
- } else {
- length += 1
- }
-
- // ClientId might be omitted in 3.1.1, but only if cleanSession is set to 1
- if ((typeof clientId === "string" || Buffer.isBuffer(clientId)) &&
- (clientId || protocolVersion == 4) &&
- (clientId || clean)) {
-
- length += clientId.length + 2
- } else {
-
- if(protocolVersion < 4) {
-
- throw new Error('clientId must be supplied before 3.1.1');
- }
-
- if(clean == 0) {
-
- throw new Error('clientId must be given if cleanSession set to 0');
- }
- }
-
- // Must be a two byte number
- if ('number' !== typeof keepalive ||
- keepalive < 0 ||
- keepalive > 65535) {
- throw new Error('Invalid keepalive')
- } else {
- length += 2
- }
-
- // Connect flags
- length += 1
-
- // If will exists...
- if (will) {
- // It must be an object
- if ('object' !== typeof will) {
- throw new Error('Invalid will')
- }
- // It must have topic typeof string
- if (!will.topic || 'string' !== typeof will.topic) {
- throw new Error('Invalid will topic')
- } else {
- length += Buffer.byteLength(will.topic) + 2
- }
-
- // Payload
- if (will.payload && will.payload) {
- if (will.payload.length >= 0) {
- if ('string' === typeof will.payload) {
- length += Buffer.byteLength(will.payload) + 2
- } else {
- length += will.payload.length + 2
- }
- } else {
- throw new Error('Invalid will payload')
- }
+ for (i = 0; i < list.length && list[i]; i++) {
+ if (typeof list[i] !== 'string') {
+ lengths[i] = list[i].length;
} else {
- length += 2
+ lengths[i] = Buffer.byteLength(list[i]);
}
+ length += lengths[i];
}
- // Username
- if (username) {
- if (username.length) {
- length += Buffer.byteLength(username) + 2
- } else {
- throw new Error('Invalid username')
- }
- }
+ result = new Buffer(length);
- // Password
- if (password) {
- if (password.length) {
- length += byteLength(password) + 2
+ for (i = 0; i < list.length && list[i]; i++) {
+ if (typeof list[i] !== 'string') {
+ list[i].copy(result, pos);
+ pos += lengths[i];
} else {
- throw new Error('Invalid password')
- }
- }
-
- var buffer = new Buffer(1 + calcLengthLength(length) + length)
- , pos = 0
-
- // Generate header
- buffer.writeUInt8(protocol.codes['connect'] << protocol.CMD_SHIFT, pos++, true)
-
- // Generate length
- pos += writeLength(buffer, pos, length)
-
- // Generate protocol ID
- pos += writeStringOrBuffer(buffer, pos, protocolId)
- buffer.writeUInt8(protocolVersion, pos++, true)
-
- // Connect flags
- var flags = 0
- flags |= username ? protocol.USERNAME_MASK : 0
- flags |= password ? protocol.PASSWORD_MASK : 0
- flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0
- flags |= (will && will.qos) ?
- will.qos << protocol.WILL_QOS_SHIFT : 0
- flags |= will ? protocol.WILL_FLAG_MASK : 0
- flags |= clean ? protocol.CLEAN_SESSION_MASK : 0
-
- buffer.writeUInt8(flags, pos++, true)
-
- // Keepalive
- pos += writeNumber(buffer, pos, keepalive)
-
- // Client ID
- pos += writeStringOrBuffer(buffer, pos, clientId)
-
- // Will
- if (will) {
- pos += writeString(buffer, pos, will.topic)
- pos += writeStringOrBuffer(buffer, pos, will.payload)
- }
-
- // Username and password
- if (username)
- pos += writeStringOrBuffer(buffer, pos, username)
-
- if (password)
- pos += writeStringOrBuffer(buffer, pos, password)
-
- return buffer
-}
-
-function connack(opts) {
- var opts = opts || {}
- , rc = opts.returnCode;
-
- // Check return code
- if ('number' !== typeof rc)
- throw new Error('Invalid return code');
-
- var buffer = new Buffer(4)
- , pos = 0;
-
- buffer.writeUInt8(protocol.codes['connack'] << protocol.CMD_SHIFT, pos++, true);
- pos += writeLength(buffer, pos, 2);
- buffer.writeUInt8(opts.sessionPresent && protocol.SESSIONPRESENT_MASK || 0, pos++, true);
- buffer.writeUInt8(rc, pos++, true);
-
- return buffer;
-}
-
-function publish(opts) {
- var opts = opts || {}
- , dup = opts.dup ? protocol.DUP_MASK : 0
- , qos = opts.qos
- , retain = opts.retain ? protocol.RETAIN_MASK : 0
- , topic = opts.topic
- , payload = opts.payload || empty
- , id = opts.messageId;
-
- var length = 0;
-
- // Topic must be a non-empty string or Buffer
- if (typeof topic === "string")
- length += Buffer.byteLength(topic) + 2;
- else if (Buffer.isBuffer(topic))
- length += topic.length + 2;
- else
- throw new Error('Invalid topic');
-
- // get the payload length
- if (!Buffer.isBuffer(payload)) {
- length += Buffer.byteLength(payload);
- } else {
- length += payload.length;
- }
-
- // Message id must a number if qos > 0
- if (qos && 'number' !== typeof id) {
- throw new Error('Invalid message id')
- } else if (qos) {
- length += 2;
- }
-
- var buffer = new Buffer(1 + calcLengthLength(length) + length)
- , pos = 0;
-
- // Header
- buffer.writeUInt8(
- protocol.codes['publish'] << protocol.CMD_SHIFT |
- dup |
- qos << protocol.QOS_SHIFT |
- retain, pos++, true);
-
- // Remaining length
- pos += writeLength(buffer, pos, length);
-
- // Topic
- pos += writeStringOrBuffer(buffer, pos, topic);
-
- // Message ID
- if (qos > 0) {
- pos += writeNumber(buffer, pos, id);
- }
-
- // Payload
- if (!Buffer.isBuffer(payload)) {
- writeStringNoPos(buffer, pos, payload);
- } else {
- writeBuffer(buffer, pos, payload);
- }
-
- return buffer;
-}
-
-/* Puback, pubrec, pubrel and pubcomp */
-function confirmation(opts) {
- var opts = opts || {}
- , type = opts.cmd || 'puback'
- , id = opts.messageId
- , dup = (opts.dup && type === 'pubrel') ? protocol.DUP_MASK : 0
- , qos = 0
-
- if (type === 'pubrel')
- qos = 1
-
- // Check message ID
- if ('number' !== typeof id)
- throw new Error('Invalid message id');
-
- var buffer = new Buffer(4)
- , pos = 0;
-
- // Header
- buffer[pos++] =
- protocol.codes[type] << protocol.CMD_SHIFT |
- dup |
- qos << protocol.QOS_SHIFT;
-
- // Length
- pos += writeLength(buffer, pos, 2);
-
- // Message ID
- pos += writeNumber(buffer, pos, id);
-
- return buffer;
-}
-
-function subscribe(opts) {
- var opts = opts || {}
- , dup = opts.dup ? protocol.DUP_MASK : 0
- , qos = opts.qos || 0
- , id = opts.messageId
- , subs = opts.subscriptions;
-
- var length = 0;
-
- // Check mid
- if ('number' !== typeof id) {
- throw new Error('Invalid message id');
- } else {
- length += 2;
- }
- // Check subscriptions
- if ('object' === typeof subs && subs.length) {
- for (var i = 0; i < subs.length; i += 1) {
- var topic = subs[i].topic
- , qos = subs[i].qos;
-
- if ('string' !== typeof topic) {
- throw new Error('Invalid subscriptions - invalid topic');
- }
- if ('number' !== typeof qos) {
- throw new Error('Invalid subscriptions - invalid qos');
- }
-
- length += Buffer.byteLength(topic) + 2 + 1;
- }
- } else {
- throw new Error('Invalid subscriptions');
- }
-
- var buffer = new Buffer(1 + calcLengthLength(length) + length)
- , pos = 0;
-
- // Generate header
- buffer.writeUInt8(
- protocol.codes['subscribe'] << protocol.CMD_SHIFT |
- dup |
- 1 << protocol.QOS_SHIFT, pos++, true);
-
- // Generate length
- pos += writeLength(buffer, pos, length);
-
- // Generate message ID
- pos += writeNumber(buffer, pos, id);
-
- // Generate subs
- for (var i = 0; i < subs.length; i++) {
- var sub = subs[i]
- , topic = sub.topic
- , qos = sub.qos;
-
- // Write topic string
- pos += writeString(buffer, pos, topic);
- // Write qos
- buffer.writeUInt8(qos, pos++, true);
- }
-
- return buffer;
-}
-
-function suback(opts) {
- var opts = opts || {}
- , id = opts.messageId
- , granted = opts.granted;
-
- var length = 0;
-
- // Check message id
- if ('number' !== typeof id) {
- throw new Error('Invalid message id');
- } else {
- length += 2;
- }
- // Check granted qos vector
- if ('object' === typeof granted && granted.length) {
- for (var i = 0; i < granted.length; i += 1) {
- if ('number' !== typeof granted[i]) {
- throw new Error('Invalid qos vector');
- }
- length += 1;
- }
- } else {
- throw new Error('Invalid qos vector');
- }
-
- var buffer = new Buffer(1 + calcLengthLength(length) + length)
- , pos = 0;
-
- // Header
- buffer.writeUInt8(protocol.codes['suback'] << protocol.CMD_SHIFT, pos++, true);
-
- // Length
- pos += writeLength(buffer, pos, length);
-
- // Message ID
- pos += writeNumber(buffer, pos, id);
-
- // Subscriptions
- for (var i = 0; i < granted.length; i++) {
- buffer.writeUInt8(granted[i], pos++, true);
- }
-
- return buffer;
-}
-
-function unsubscribe(opts) {
- var opts = opts || {}
- , id = opts.messageId
- , dup = opts.dup ? protocol.DUP_MASK : 0
- , unsubs = opts.unsubscriptions;
-
- var length = 0;
-
- // Check message id
- if ('number' !== typeof id) {
- throw new Error('Invalid message id');
- } else {
- length += 2;
- }
- // Check unsubs
- if ('object' === typeof unsubs && unsubs.length) {
- for (var i = 0; i < unsubs.length; i += 1) {
- if ('string' !== typeof unsubs[i]) {
- throw new Error('Invalid unsubscriptions');
- }
- length += Buffer.byteLength(unsubs[i]) + 2;
- }
- } else {
- throw new Error('Invalid unsubscriptions');
- }
-
- var buffer = new Buffer(1 + calcLengthLength(length) + length)
- , pos = 0;
-
- // Header
- buffer[pos++] =
- protocol.codes['unsubscribe'] << protocol.CMD_SHIFT |
- dup |
- 1 << protocol.QOS_SHIFT;
-
- // Length
- pos += writeLength(buffer, pos, length);
-
- // Message ID
- pos += writeNumber(buffer, pos, id);
-
- // Unsubs
- for (var i = 0; i < unsubs.length; i++) {
- pos += writeString(buffer, pos, unsubs[i]);
- }
-
- return buffer;
-}
-
-function emptyPacket(opts) {
- var buf = new Buffer(2);
- buf[0] = protocol.codes[opts.cmd] << 4;
- buf[1] = 0;
- return buf;
-}
-
-/**
- * calcLengthLength - calculate the length of the remaining
- * length field
- *
- * @api private
- */
-function calcLengthLength(length) {
- if (length >= 0 && length < 128) {
- return 1
- } else if (length >= 128 && length < 16384) {
- return 2
- } else if (length >= 16384 && length < 2097152) {
- return 3
- } else if (length >= 2097152 && length < 268435456) {
- return 4
- } else {
- return 0
- }
-}
-
-/**
- * writeLength - write an MQTT style length field to the buffer
- *
- * @param buffer - destination
- * @param pos - offset
- * @param length - length (>0)
- * @returns number of bytes written
- *
- * @api private
- */
-
-function writeLength(buffer, pos, length) {
- var digit = 0
- , origPos = pos
-
- do {
- digit = length % 128 | 0
- length = length / 128 | 0
- if (length > 0) {
- digit = digit | 0x80
+ result.write(list[i], pos);
+ pos += lengths[i];
}
- buffer.writeUInt8(digit, pos++, true)
- } while (length > 0)
-
- return pos - origPos
-}
-
-/**
- * writeString - write a utf8 string to the buffer
- *
- * @param buffer - destination
- * @param pos - offset
- * @param string - string to write
- * @return number of bytes written
- *
- * @api private
- */
-
-function writeString(buffer, pos, string) {
- var strlen = Buffer.byteLength(string)
- writeNumber(buffer, pos, strlen)
-
- writeStringNoPos(buffer, pos + 2, string)
-
- return strlen + 2
-}
-
-function writeStringNoPos(buffer, pos, string) {
- buffer.write(string, pos)
-}
-
-/**
- * write_buffer - write buffer to buffer
- *
- * @param buffer - dest buffer
- * @param pos - offset
- * @param src - source buffer
- * @return number of bytes written
- *
- * @api private
- */
-
-function writeBuffer(buffer, pos, src) {
- src.copy(buffer, pos)
- return src.length
-}
-
-/**
- * writeNumber - write a two byte number to the buffer
- *
- * @param buffer - destination
- * @param pos - offset
- * @param number - number to write
- * @return number of bytes written
- *
- * @api private
- */
-function writeNumber(buffer, pos, number) {
- buffer.writeUInt8(number >> 8, pos, true)
- buffer.writeUInt8(number & 0x00FF, pos + 1, true)
-
- return 2
-}
-
-/**
- * writeStringOrBuffer - write a String or Buffer with the its length prefix
- *
- * @param buffer - destination
- * @param pos - offset
- * @param toWrite - String or Buffer
- * @return number of bytes written
- */
-function writeStringOrBuffer(buffer, pos, toWrite) {
- var written = 0
-
- if (toWrite && typeof toWrite === 'string') {
- written += writeString(buffer, pos + written, toWrite)
- } else if (toWrite) {
- written += writeNumber(buffer, pos + written, toWrite.length)
- written += writeBuffer(buffer, pos + written, toWrite)
- } else {
- written += writeNumber(buffer, pos + written, 0)
}
- return written
-}
-
-function byteLength(bufOrString) {
- if (Buffer.isBuffer(bufOrString)) {
- return bufOrString.length
- } else {
- return Buffer.byteLength(bufOrString)
- }
-}
+ return result;
+};
module.exports = generate
diff --git a/mqtt.js b/mqtt.js
index c2dbcfe..ccafe0b 100644
--- a/mqtt.js
+++ b/mqtt.js
@@ -3,3 +3,4 @@
exports.parser = require('./parser')
exports.generate = require('./generate')
+exports.writeToStream = require('./writeToStream')
diff --git a/numbers.js b/numbers.js
new file mode 100644
index 0000000..a05fe38
--- /dev/null
+++ b/numbers.js
@@ -0,0 +1,14 @@
+'use strict'
+
+var max = 65536
+var cache = {}
+var buffer
+
+for (var i = 0; i < max; i++) {
+ buffer = new Buffer(2)
+ buffer.writeUInt8(i >> 8, 0, true)
+ buffer.writeUInt8(i & 0x00FF, 0 + 1, true)
+ cache[i] = buffer;
+}
+
+module.exports = cache
diff --git a/package.json b/package.json
index cb2633e..638ae8c 100644
--- a/package.json
+++ b/package.json
@@ -29,6 +29,7 @@
},
"homepage": "https://github.com/mqttjs/mqtt-packet",
"devDependencies": {
+ "dev-null": "^0.1.1",
"faucet": "0.0.1",
"pre-commit": "^1.1.1",
"tape": "^4.2.0"
diff --git a/writeToStream.js b/writeToStream.js
new file mode 100644
index 0000000..a2c58c5
--- /dev/null
+++ b/writeToStream.js
@@ -0,0 +1,587 @@
+
+'use strict';
+
+var protocol = require('./constants')
+ , empty = new Buffer(0)
+ , zeroBuf = new Buffer([0])
+ , numCache = require('./numbers')
+
+function generate(packet, stream) {
+ if (stream.cork) {
+ stream.cork()
+ process.nextTick(uncork, stream)
+ }
+
+ switch (packet.cmd) {
+ case 'connect':
+ return connect(packet, stream);
+ case 'connack':
+ return connack(packet, stream);
+ case 'publish':
+ return publish(packet, stream);
+ case 'puback':
+ case 'pubrec':
+ case 'pubrel':
+ case 'pubcomp':
+ case 'unsuback':
+ return confirmation(packet, stream);
+ case 'subscribe':
+ return subscribe(packet, stream);
+ case 'suback':
+ return suback(packet, stream);
+ case 'unsubscribe':
+ return unsubscribe(packet, stream);
+ case 'pingreq':
+ case 'pingresp':
+ case 'disconnect':
+ return emptyPacket(packet, stream);
+ default:
+ stream.emit('error', new Error('unknown command'));
+ return false;
+ }
+}
+
+function uncork(stream) {
+ stream.uncork();
+}
+
+function connect(opts, stream) {
+ var opts = opts || {}
+ , protocolId = opts.protocolId || 'MQTT'
+ , protocolVersion = opts.protocolVersion || 4
+ , will = opts.will
+ , clean = opts.clean
+ , keepalive = opts.keepalive || 0
+ , clientId = opts.clientId || ""
+ , username = opts.username
+ , password = opts.password
+
+ if (clean === undefined) {
+ clean = true
+ }
+
+ var length = 0
+
+ // Must be a string and non-falsy
+ if (!protocolId ||
+ (typeof protocolId !== "string" && !Buffer.isBuffer(protocolId))) {
+ stream.emit('error', new Error('Invalid protocol id'))
+ } else {
+ length += protocolId.length + 2
+ }
+
+ // Must be 3 or 4
+ if (protocolVersion !== 3 && protocolVersion !== 4) {
+ stream.emit('error', new Error('Invalid protocol version'))
+ } else {
+ length += 1
+ }
+
+ // ClientId might be omitted in 3.1.1, but only if cleanSession is set to 1
+ if ((typeof clientId === "string" || Buffer.isBuffer(clientId)) &&
+ (clientId || protocolVersion == 4) &&
+ (clientId || clean)) {
+
+ length += clientId.length + 2
+ } else {
+
+ if (protocolVersion < 4) {
+ stream.emit('error', new Error('clientId must be supplied before 3.1.1'));
+ }
+
+ if (clean == 0) {
+ stream.emit('error', new Error('clientId must be given if cleanSession set to 0'));
+ }
+ }
+
+ // Must be a two byte number
+ if ('number' !== typeof keepalive ||
+ keepalive < 0 ||
+ keepalive > 65535) {
+ stream.emit('error', new Error('Invalid keepalive'))
+ } else {
+ length += 2
+ }
+
+ // Connect flags
+ length += 1
+
+ // If will exists...
+ if (will) {
+ // It must be an object
+ if ('object' !== typeof will) {
+ stream.emit('error', new Error('Invalid will'))
+ }
+ // It must have topic typeof string
+ if (!will.topic || 'string' !== typeof will.topic) {
+ stream.emit('error', new Error('Invalid will topic'))
+ } else {
+ length += Buffer.byteLength(will.topic) + 2
+ }
+
+ // Payload
+ if (will.payload && will.payload) {
+ if (will.payload.length >= 0) {
+ if ('string' === typeof will.payload) {
+ length += Buffer.byteLength(will.payload) + 2
+ } else {
+ length += will.payload.length + 2
+ }
+ } else {
+ stream.emit('error', new Error('Invalid will payload'))
+ }
+ } else {
+ length += 2
+ }
+ }
+
+ // Username
+ if (username) {
+ if (username.length) {
+ length += Buffer.byteLength(username) + 2
+ } else {
+ stream.emit('error', new Error('Invalid username'))
+ }
+ }
+
+ // Password
+ if (password) {
+ if (password.length) {
+ length += byteLength(password) + 2
+ } else {
+ stream.emit('error', new Error('Invalid password'))
+ }
+ }
+
+ // Generate header
+ stream.write(protocol.CONNECT_HEADER);
+
+ // Generate length
+ writeLength(stream, length)
+
+ // Generate protocol ID
+ writeStringOrBuffer(stream, protocolId)
+ stream.write(
+ protocolVersion === 4 ?
+ protocol.VERSION4 : protocol.VERSION3
+ );
+
+ // Connect flags
+ var flags = 0
+ flags |= username ? protocol.USERNAME_MASK : 0
+ flags |= password ? protocol.PASSWORD_MASK : 0
+ flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0
+ flags |= (will && will.qos) ?
+ will.qos << protocol.WILL_QOS_SHIFT : 0
+ flags |= will ? protocol.WILL_FLAG_MASK : 0
+ flags |= clean ? protocol.CLEAN_SESSION_MASK : 0
+
+ stream.write(new Buffer([flags]));
+
+ // Keepalive
+ writeNumber(stream, keepalive);
+
+ // Client ID
+ writeStringOrBuffer(stream, clientId);
+
+ // Will
+ if (will) {
+ writeString(stream, will.topic);
+ writeStringOrBuffer(stream, will.payload);
+ }
+
+ // Username and password
+ if (username)
+ writeStringOrBuffer(stream, username);
+
+ if (password)
+ writeStringOrBuffer(stream, password);
+
+ // this is a small packet that
+ // happens only once on a stream
+ // we assume the stream is always free
+ // to receive more data after this
+ return true
+}
+
+function connack(opts, stream) {
+ var opts = opts || {}
+ , rc = opts.returnCode;
+
+ // Check return code
+ if ('number' !== typeof rc)
+ stream.emit('error', new Error('Invalid return code'));
+
+ stream.write(protocol.CONNACK_HEADER);
+ writeLength(stream, 2);
+ stream.write(opts.sessionPresent ?
+ protocol.SESSIONPRESENT_HEADER : zeroBuf);
+
+ return stream.write(new Buffer([rc]));
+}
+
+function publish(opts, stream) {
+ var opts = opts || {}
+ , qos = opts.qos || 0
+ , retain = opts.retain ? protocol.RETAIN_MASK : 0
+ , topic = opts.topic
+ , payload = opts.payload || empty
+ , id = opts.messageId;
+
+ var length = 0;
+
+ // Topic must be a non-empty string or Buffer
+ if (typeof topic === "string")
+ length += Buffer.byteLength(topic) + 2;
+ else if (Buffer.isBuffer(topic))
+ length += topic.length + 2;
+ else
+ stream.emit('error', new Error('Invalid topic'));
+
+ // get the payload length
+ if (!Buffer.isBuffer(payload)) {
+ length += Buffer.byteLength(payload);
+ } else {
+ length += payload.length;
+ }
+
+ // Message id must a number if qos > 0
+ if (qos && 'number' !== typeof id) {
+ stream.emit('error', new Error('Invalid message id'))
+ } else if (qos) {
+ length += 2;
+ }
+
+ // Header
+ stream.write(protocol.PUBLISH_HEADER[qos][opts.dup ? 1 : 0][retain ? 1 : 0]);
+
+ // Remaining length
+ writeLength(stream, length);
+
+ // Topic
+ writeNumber(stream, byteLength(topic));
+ stream.write(topic);
+
+ // Message ID
+ if (qos > 0) {
+ writeNumber(stream, id);
+ }
+
+ // Payload
+ return stream.write(payload)
+}
+
+/* Puback, pubrec, pubrel and pubcomp */
+function confirmation(opts, stream) {
+ var opts = opts || {}
+ , type = opts.cmd || 'puback'
+ , id = opts.messageId
+ , dup = (opts.dup && type === 'pubrel') ? protocol.DUP_MASK : 0
+ , qos = 0
+
+ if (type === 'pubrel')
+ qos = 1
+
+ // Check message ID
+ if ('number' !== typeof id)
+ stream.emit('error', new Error('Invalid message id'));
+
+ // Header
+ stream.write(protocol.ACKS[type][qos][dup][0])
+
+ // Length
+ writeLength(stream, 2);
+
+ // Message ID
+ return writeNumber(stream, id);
+}
+
+function subscribe(opts, stream) {
+ var opts = opts || {}
+ , dup = opts.dup ? protocol.DUP_MASK : 0
+ , qos = opts.qos || 0
+ , id = opts.messageId
+ , subs = opts.subscriptions;
+
+ var length = 0;
+
+ // Check mid
+ if ('number' !== typeof id) {
+ stream.emit('error', new Error('Invalid message id'));
+ } else {
+ length += 2;
+ }
+ // Check subscriptions
+ if ('object' === typeof subs && subs.length) {
+ for (var i = 0; i < subs.length; i += 1) {
+ var topic = subs[i].topic
+ , qos = subs[i].qos;
+
+ if ('string' !== typeof topic) {
+ stream.emit('error', new Error('Invalid subscriptions - invalid topic'));
+ }
+ if ('number' !== typeof qos) {
+ stream.emit('error', new Error('Invalid subscriptions - invalid qos'));
+ }
+
+ length += Buffer.byteLength(topic) + 2 + 1;
+ }
+ } else {
+ stream.emit('error', new Error('Invalid subscriptions'));
+ }
+
+ // Generate header
+ stream.write(protocol.SUBSCRIBE_HEADER[1][dup ? 1 : 0][0]);
+
+ // Generate length
+ writeLength(stream, length);
+
+ // Generate message ID
+ writeNumber(stream, id);
+
+ var result = true
+
+ // Generate subs
+ for (var i = 0; i < subs.length; i++) {
+ var sub = subs[i]
+ , topic = sub.topic
+ , qos = sub.qos;
+
+ // Write topic string
+ writeString(stream, topic);
+ // Write qos
+ result = stream.write(protocol.QOS[qos]);
+ }
+
+ return result;
+}
+
+function suback(opts, stream) {
+ var opts = opts || {}
+ , id = opts.messageId
+ , granted = opts.granted;
+
+ var length = 0;
+
+ // Check message id
+ if ('number' !== typeof id) {
+ stream.emit('error', new Error('Invalid message id'));
+ } else {
+ length += 2;
+ }
+ // Check granted qos vector
+ if ('object' === typeof granted && granted.length) {
+ for (var i = 0; i < granted.length; i += 1) {
+ if ('number' !== typeof granted[i]) {
+ stream.emit('error', new Error('Invalid qos vector'));
+ }
+ length += 1;
+ }
+ } else {
+ stream.emit('error', new Error('Invalid qos vector'));
+ }
+
+ // header
+ stream.write(protocol.SUBACK_HEADER);
+
+ // Length
+ writeLength(stream, length);
+
+ // Message ID
+ writeNumber(stream, id);
+
+ return stream.write(new Buffer(granted));
+}
+
+function unsubscribe(opts, stream) {
+ var opts = opts || {}
+ , id = opts.messageId
+ , dup = opts.dup ? protocol.DUP_MASK : 0
+ , unsubs = opts.unsubscriptions;
+
+ var length = 0;
+
+ // Check message id
+ if ('number' !== typeof id) {
+ stream.emit('error', new Error('Invalid message id'));
+ } else {
+ length += 2;
+ }
+ // Check unsubs
+ if ('object' === typeof unsubs && unsubs.length) {
+ for (var i = 0; i < unsubs.length; i += 1) {
+ if ('string' !== typeof unsubs[i]) {
+ stream.emit('error', new Error('Invalid unsubscriptions'));
+ }
+ length += Buffer.byteLength(unsubs[i]) + 2;
+ }
+ } else {
+ stream.emit('error', new Error('Invalid unsubscriptions'));
+ }
+
+ // Header
+ stream.write(protocol.UNSUBSCRIBE_HEADER[1][dup ? 1 : 0][0]);
+
+ // Length
+ writeLength(stream, length);
+
+ // Message ID
+ writeNumber(stream, id);
+
+ // Unsubs
+ var result = true
+ for (var i = 0; i < unsubs.length; i++) {
+ result = writeString(stream, unsubs[i]);
+ }
+
+ return result;
+}
+
+function emptyPacket(opts, stream) {
+ return stream.write(protocol.EMPTY[opts.cmd]);
+}
+
+/**
+ * calcLengthLength - calculate the length of the remaining
+ * length field
+ *
+ * @api private
+ */
+function calcLengthLength(length) {
+ if (length >= 0 && length < 128) {
+ return 1
+ } else if (length >= 128 && length < 16384) {
+ return 2
+ } else if (length >= 16384 && length < 2097152) {
+ return 3
+ } else if (length >= 2097152 && length < 268435456) {
+ return 4
+ } else {
+ return 0
+ }
+}
+
+function genBufLength(length) {
+ var digit = 0
+ , pos = 0
+ , buffer = new Buffer(calcLengthLength(length))
+
+ do {
+ digit = length % 128 | 0
+ length = length / 128 | 0
+ if (length > 0) {
+ digit = digit | 0x80
+ }
+ buffer.writeUInt8(digit, pos++, true)
+ } while (length > 0)
+
+ return buffer
+}
+
+/**
+ * writeLength - write an MQTT style length field to the buffer
+ *
+ * @param buffer - destination
+ * @param pos - offset
+ * @param length - length (>0)
+ * @returns number of bytes written
+ *
+ * @api private
+ */
+
+var lengthCache = {}
+function writeLength(stream, length) {
+ var buffer = lengthCache[length]
+
+ if (!buffer) {
+ buffer = genBufLength(length)
+ if (length < 16384) {
+ lengthCache[length] = buffer
+ }
+ }
+
+ stream.write(buffer)
+}
+
+/**
+ * writeString - write a utf8 string to the buffer
+ *
+ * @param buffer - destination
+ * @param pos - offset
+ * @param string - string to write
+ * @return number of bytes written
+ *
+ * @api private
+ */
+
+function writeString(stream, string) {
+ var strlen = Buffer.byteLength(string)
+ writeNumber(stream, strlen)
+
+ stream.write(string, 'utf8')
+}
+
+function writeStringNoPos(buffer, pos, string) {
+ buffer.write(string, pos)
+}
+
+/**
+ * write_buffer - write buffer to buffer
+ *
+ * @param buffer - dest buffer
+ * @param pos - offset
+ * @param src - source buffer
+ * @return number of bytes written
+ *
+ * @api private
+ */
+
+function writeBuffer(buffer, pos, src) {
+ src.copy(buffer, pos)
+ return src.length
+}
+
+/**
+ * writeNumber - write a two byte number to the buffer
+ *
+ * @param buffer - destination
+ * @param pos - offset
+ * @param number - number to write
+ * @return number of bytes written
+ *
+ * @api private
+ */
+function writeNumber(stream, number) {
+ return stream.write(numCache[number])
+}
+
+/**
+ * writeStringOrBuffer - write a String or Buffer with the its length prefix
+ *
+ * @param buffer - destination
+ * @param pos - offset
+ * @param toWrite - String or Buffer
+ * @return number of bytes written
+ */
+function writeStringOrBuffer(stream, toWrite) {
+ if (toWrite && typeof toWrite === 'string') {
+ writeString(stream, toWrite)
+ } else if (toWrite) {
+ writeNumber(stream, toWrite.length)
+ stream.write(toWrite)
+ } else {
+ writeNumber(stream, 0)
+ }
+}
+
+function byteLength(bufOrString) {
+ if (!bufOrString) {
+ return 0
+ } else if (Buffer.isBuffer(bufOrString)) {
+ return bufOrString.length
+ } else {
+ return Buffer.byteLength(bufOrString)
+ }
+}
+
+module.exports = generate