Skip to content

Commit

Permalink
perf: ⚡️ generate buffer for every packet
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Oct 5, 2020
1 parent 914a52e commit b799c8e
Show file tree
Hide file tree
Showing 14 changed files with 86 additions and 141 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ Run main benchmark, which creates a single parser and runs various packet types

```
node benchmarks/bench.js
mqtt-codec x 49,683 ops/sec ±1.17% (93 runs sampled)
mqtt-packet x 11,073 ops/sec ±1.39% (90 runs sampled)
mqtt-codec x 55,148 ops/sec ±1.72% (90 runs sampled)
mqtt-packet x 10,535 ops/sec ±1.55% (92 runs sampled)
Fastest is mqtt-codec
```

Expand Down
93 changes: 21 additions & 72 deletions src/MqttDecoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,30 +131,31 @@ export class MqttDecoder {
this.state = DECODER_STATE.HEADER;
this.offset = 0;

const buf = list.slice(offset, offset + l);
list.consume(packetEndOffset);

const type: PACKET_TYPE = (b >> 4) as PACKET_TYPE;
switch (type) {
case PACKET_TYPE.PUBLISH: {
const topic = parseBinary(list, offset);
let offset = 0;
const topic = parseBinary(buf, offset);
offset += 2 + topic.byteLength;
let i: number = 0;
if (((b >> 1) & 0b11) > 0) {
i = list.readUInt16BE(offset);
i = buf.readUInt16BE(offset);
offset += 2;
}
let p: Properties = {};
if (this.version === 5) {
const [props, size] = parseProps(list, offset);
const [props, size] = parseProps(buf, offset);
p = props;
offset += size;
}
const d = list.slice(offset, packetEndOffset);
const d = buf.slice(offset, packetEndOffset);
const t = topic.toString('utf8');
list.consume(packetEndOffset);
return new PacketPublish(b, l, t, i, p, d);
}
case PACKET_TYPE.CONNECT: {
const buf = list.slice(offset, offset + l);
list.consume(packetEndOffset);
offset = 2 + buf.readUInt16BE(0); // Skip "MQTT" or "MQIsdp" protocol name.
const v = buf.readUInt8(offset++);
const f = buf.readUInt8(offset++);
Expand Down Expand Up @@ -201,71 +202,19 @@ export class MqttDecoder {
this.version = packet.v;
return packet;
}
case PACKET_TYPE.CONNACK: {
const packet = parseConnack(b, l, list, this.version, offset);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.PUBACK: {
const packet = parsePuback(b, l, list, this.version, offset);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.PUBREC: {
const packet = parsePubrec(b, l, list, this.version, offset);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.PUBREL: {
const packet = parsePubrel(b, l, list, this.version, offset);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.PUBCOMP: {
const packet = parsePubcomp(b, l, list, this.version, offset);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.SUBSCRIBE: {
const packet = parseSubscribe(b, l, list, this.version, offset);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.SUBACK: {
const packet = parseSuback(b, l, list, this.version, offset);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.UNSUBSCRIBE: {
const packet = parseUnsubscribe(b, l, list, this.version, offset);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.UNSUBACK: {
const packet = parseUnsuback(b, l, list, this.version, offset);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.PINGREQ: {
const packet = new PacketPingreq(b, l);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.PINGRESP: {
const packet = new PacketPingresp(b, l);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.DISCONNECT: {
const packet = parseDisconnect(b, l, list, this.version, offset);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.AUTH: {
const packet = parseAuth(b, l, list, this.version, offset);
list.consume(packetEndOffset);
return packet;
}
case PACKET_TYPE.CONNACK: return parseConnack(b, l, buf, this.version);
case PACKET_TYPE.PUBACK: return parsePuback(b, l, buf, this.version);
case PACKET_TYPE.PUBREC: return parsePubrec(b, l, buf, this.version);
case PACKET_TYPE.PUBREL: return parsePubrel(b, l, buf, this.version);
case PACKET_TYPE.PUBCOMP: return parsePubcomp(b, l, buf, this.version);
case PACKET_TYPE.SUBSCRIBE: return parseSubscribe(b, l, buf, this.version);
case PACKET_TYPE.SUBACK: return parseSuback(b, l, buf, this.version);
case PACKET_TYPE.UNSUBSCRIBE: return parseUnsubscribe(b, l, buf, this.version);
case PACKET_TYPE.UNSUBACK: return parseUnsuback(b, l, buf, this.version);
case PACKET_TYPE.PINGREQ: return new PacketPingreq(b, l);
case PACKET_TYPE.PINGRESP: return new PacketPingresp(b, l);
case PACKET_TYPE.DISCONNECT: return parseDisconnect(b, l, buf, this.version);
case PACKET_TYPE.AUTH: return parseAuth(b, l, buf, this.version);
default: throw ERROR.MALFORMED_PACKET;
}
} catch (error) {
Expand Down
9 changes: 4 additions & 5 deletions src/packets/auth.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {BufferList} from '../BufferList';
import {Packet, PacketHeaderData} from '../packet';
import {Properties} from '../types';
import {BufferLike, Properties} from '../types';
import {parseProps} from '../util/parse';

export interface PacketAuthData extends PacketHeaderData {
Expand All @@ -21,8 +20,8 @@ export class PacketAuth extends Packet implements PacketAuthData {
}
}

export const parseAuth = (b: number, l: number, data: BufferList, version: number, offset: number): PacketAuth => {
const c: number = data.readUInt8(offset);
const [p] = parseProps(data, offset + 1);
export const parseAuth = (b: number, l: number, buf: BufferLike, version: number): PacketAuth => {
const c: number = buf.readUInt8(0);
const [p] = parseProps(buf, 1);
return new PacketAuth(b, l, c, p);
};
10 changes: 5 additions & 5 deletions src/packets/connack.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {BufferList} from '../BufferList';
import {Packet, PacketHeaderData} from '../packet';
import {Properties} from '../types';
import {BufferLike, Properties} from '../types';
import {parseProps} from '../util/parse';

export interface PacketConnackData extends PacketHeaderData {
Expand Down Expand Up @@ -28,11 +28,11 @@ export class PacketConnack extends Packet implements PacketConnackData {
}
}

export const parseConnack = (b: number, l: number, data: BufferList, version: number, offset: number): PacketConnack => {
const f = data.readUInt8(offset);
const c = data.readUInt8(offset + 1);
export const parseConnack = (b: number, l: number, data: BufferLike, version: number): PacketConnack => {
const f = data.readUInt8(0);
const c = data.readUInt8(1);
let p: Properties = {};
if (version === 5) [p] = parseProps(data, offset + 2);
if (version === 5) [p] = parseProps(data, 2);
const packet = new PacketConnack(b, l, f, c, p);
return packet;
};
9 changes: 4 additions & 5 deletions src/packets/disconnect.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {BufferList} from '../BufferList';
import {Packet, PacketHeaderData} from '../packet';
import {Properties} from '../types';
import {BufferLike, Properties} from '../types';
import {parseProps} from '../util/parse';

export interface PacketDisconnectData extends PacketHeaderData {
Expand All @@ -21,13 +20,13 @@ export class PacketDisconnect extends Packet implements PacketDisconnectData {
}
}

export const parseDisconnect = (b: number, l: number, data: BufferList, version: number, offset: number): PacketDisconnect => {
export const parseDisconnect = (b: number, l: number, buf: BufferLike, version: number): PacketDisconnect => {
let c: number = 0;
let p: Properties = {};
if (version === 5) {
c = data.readUInt8(offset);
c = buf.readUInt8(0);
if (l > 1) {
[p] = parseProps(data, offset + 1);
[p] = parseProps(buf, 1);
}
}
return new PacketDisconnect(b, l, c, p);
Expand Down
11 changes: 5 additions & 6 deletions src/packets/puback.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {BufferList} from '../BufferList';
import {Packet, PacketHeaderData} from '../packet';
import {Properties} from '../types';
import {BufferLike, Properties} from '../types';
import {parseProps} from '../util/parse';

export interface PacketPubackData extends PacketHeaderData {
Expand All @@ -24,14 +23,14 @@ export class PacketPuback extends Packet implements PacketPubackData {
}
}

export const parsePuback = (b: number, l: number, data: BufferList, version: number, offset: number): PacketPuback => {
const i = data.readUInt16BE(offset);
export const parsePuback = (b: number, l: number, data: BufferLike, version: number): PacketPuback => {
const i = data.readUInt16BE(0);
let c: number = 0;
let p: Properties = {};
if (version === 5) {
c = data.readUInt8(offset + 2);
c = data.readUInt8(2);
if (l > 3) {
[p] = parseProps(data, offset + 3);
[p] = parseProps(data, 3);
}
}
return new PacketPuback(b, l, i, c, p);
Expand Down
11 changes: 5 additions & 6 deletions src/packets/pubcomp.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {BufferList} from '../BufferList';
import {Packet, PacketHeaderData} from '../packet';
import {Properties} from '../types';
import {BufferLike, Properties} from '../types';
import {parseProps} from '../util/parse';

export interface PacketPubcompData extends PacketHeaderData {
Expand All @@ -24,14 +23,14 @@ export class PacketPubcomp extends Packet implements PacketPubcompData {
}
}

export const parsePubcomp = (b: number, l: number, data: BufferList, version: number, offset: number): PacketPubcomp => {
const i = data.readUInt16BE(offset);
export const parsePubcomp = (b: number, l: number, data: BufferLike, version: number): PacketPubcomp => {
const i = data.readUInt16BE(0);
let c: number = 0;
let p: Properties = {};
if (version === 5) {
c = data.readUInt8(offset + 2);
c = data.readUInt8(2);
if (l > 3) {
[p] = parseProps(data, offset + 3);
[p] = parseProps(data, 3);
}
}
return new PacketPubcomp(b, l, i, c, p);
Expand Down
10 changes: 5 additions & 5 deletions src/packets/pubrec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {BufferList} from '../BufferList';
import {Packet, PacketHeaderData} from '../packet';
import {Properties} from '../types';
import {BufferLike, Properties} from '../types';
import {parseProps} from '../util/parse';

export interface PacketPubrecData extends PacketHeaderData {
Expand All @@ -24,14 +24,14 @@ export class PacketPubrec extends Packet implements PacketPubrecData {
}
}

export const parsePubrec = (b: number, l: number, data: BufferList, version: number, offset: number): PacketPubrec => {
const i = data.readUInt16BE(offset);
export const parsePubrec = (b: number, l: number, data: BufferLike, version: number): PacketPubrec => {
const i = data.readUInt16BE(0);
let c: number = 0;
let p: Properties = {};
if (version === 5) {
c = data.readUInt8(offset + 2);
c = data.readUInt8(2);
if (l > 3) {
[p] = parseProps(data, offset + 3);
[p] = parseProps(data, 3);
}
}
return new PacketPubrec(b, l, i, c, p);
Expand Down
11 changes: 5 additions & 6 deletions src/packets/pubrel.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {BufferList} from '../BufferList';
import {Packet, PacketHeaderData} from '../packet';
import {Properties} from '../types';
import {BufferLike, Properties} from '../types';
import {parseProps} from '../util/parse';

export interface PacketPubrelData extends PacketHeaderData {
Expand All @@ -24,14 +23,14 @@ export class PacketPubrel extends Packet implements PacketPubrelData {
}
}

export const parsePubrel = (b: number, l: number, data: BufferList, version: number, offset: number): PacketPubrel => {
const i = data.readUInt16BE(offset);
export const parsePubrel = (b: number, l: number, data: BufferLike, version: number): PacketPubrel => {
const i = data.readUInt16BE(0);
let c: number = 0;
let p: Properties = {};
if (version === 5) {
c = data.readUInt8(offset + 2);
c = data.readUInt8(2);
if (l > 3) {
[p] = parseProps(data, offset + 3);
[p] = parseProps(data, 3);
}
}
return new PacketPubrel(b, l, i, c, p);
Expand Down
14 changes: 7 additions & 7 deletions src/packets/suback.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {BufferList} from '../BufferList';
import {Packet, PacketHeaderData} from '../packet';
import {Properties} from '../types';
import {BufferLike, Properties} from '../types';
import {parseProps} from '../util/parse';

export interface PacketSubackData extends PacketHeaderData {
Expand All @@ -24,19 +23,20 @@ export class PacketSuback extends Packet implements PacketSubackData {
}
}

export const parseSuback = (b: number, l: number, data: BufferList, version: number, offset: number): PacketSuback => {
const i = data.readUInt16BE(offset);
export const parseSuback = (b: number, l: number, buf: BufferLike, version: number): PacketSuback => {
let offset = 0;
const i = buf.readUInt16BE(offset);
offset += 2;
let p: Properties = {};
if (version === 5) {
const [props, size] = parseProps(data, offset);
const [props, size] = parseProps(buf, offset);
p = props;
offset += size;
}
const len = data.length;
const len = buf.length;
const s: number[] = [];
while (offset < len) {
s.push(data.readUInt8(offset++));
s.push(buf.readUInt8(offset++));
}
return new PacketSuback(b, l, i, p, s);
};
16 changes: 8 additions & 8 deletions src/packets/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {BufferList} from '../BufferList';
import {Packet, PacketHeaderData} from '../packet';
import {Properties} from '../types';
import {BufferLike, Properties} from '../types';
import {parseBinary, parseProps} from '../util/parse';

export interface PacketSubscribeData extends PacketHeaderData {
Expand Down Expand Up @@ -51,21 +50,22 @@ export class Subscription implements SubscriptionData {
}
}

export const parseSubscribe = (b: number, l: number, data: BufferList, version: number, offset: number): PacketSubscribe => {
const i = data.readUInt16BE(offset);
export const parseSubscribe = (b: number, l: number, buf: BufferLike, version: number): PacketSubscribe => {
let offset = 0;
const i = buf.readUInt16BE(offset);
offset += 2;
let p: Properties = {};
if (version === 5) {
const [props, size] = parseProps(data, offset);
const [props, size] = parseProps(buf, offset);
p = props;
offset += size;
}
const len = data.length;
const len = buf.length;
const s: Subscription[] = [];
while (offset < len) {
const topic = parseBinary(data, offset);
const topic = parseBinary(buf, offset);
offset += 2 + topic.byteLength;
const f = data.readUInt8(offset);
const f = buf.readUInt8(offset);
offset++;
s.push(new Subscription(topic.toString('utf8'), f));
}
Expand Down
Loading

0 comments on commit b799c8e

Please sign in to comment.