Skip to content

Commit

Permalink
feat: 🎸 implement header parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Oct 3, 2020
1 parent 10edde6 commit c7af465
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 45 deletions.
86 changes: 65 additions & 21 deletions src/MqttDecoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ import BufferList from 'bl';
import {DECODER_STATE} from './enums';
import {MqttPacket} from './packet';

export type MqttDecoderOnPacket = (packet: any) => void;

export class MqttDecoder {
public state: DECODER_STATE = DECODER_STATE.HEADER;
public state: DECODER_STATE = DECODER_STATE.BYTE;
public error: null | Error = null;
public list = new BufferList();
public packet = new MqttPacket();

constructor (private readonly onPacket: MqttDecoderOnPacket) {}
constructor () {}

public push (buf: Buffer) {
this.list.append(buf);
Expand All @@ -21,34 +19,80 @@ export class MqttDecoder {
}

public reset () {
this.state = DECODER_STATE.HEADER;
this.error = null
this.state = DECODER_STATE.BYTE;
this.error = null;
this.list = new BufferList();
this.packet = new MqttPacket();
}

public parse (buf: Buffer) {
if (this.error) this.reset();

this.list.append(buf)
public parse (): MqttPacket | null {
this.parseFixedHeader();
this.parseVariableData();

return this.parsePacket();
const packet = this.packet;
const isPacketEmpty = !packet.b;
const isParsingInProgress = this.state !== DECODER_STATE.BYTE;
if (isPacketEmpty || isParsingInProgress) return null;
this.packet = new MqttPacket();
return packet;
}

public parsePacket () {
if (this.state === DECODER_STATE.HEADER) {
this.parseHeader();
}
public parseFixedHeader () {
this.parseFirstByte();
this.parseLength();
}

public parseHeader () {
public parseFirstByte () {
if (this.state !== DECODER_STATE.BYTE) return;
const list = this.list;
if (!list.length) return;
const byte = list.readUInt8(0);
this.packet.b = list.readUInt8(0);
list.consume(1);
this.state = DECODER_STATE.LENGTH;
const packet = this.packet;
packet.t = byte >> 4;
packet.f = byte & 0b1111;
this.state = DECODER_STATE.LEN;
}

public parseLength () {
if (this.state !== DECODER_STATE.LEN) return;
const int = this.consumeVarInt();
if (int < 0) return;
this.packet.l = int;
this.state = DECODER_STATE.DATA;
}

private consumeVarInt (): number {
const list = this.list;
const length = list.length;

if (length < 1) return -1;
const b1 = list.readUInt8(0);
if (b1 ^ 0b10000000) {
list.consume(1);
return b1 & 0b01111111;
}

if (length < 2) return -1;
const b2 = list.readUInt8(1);
if (b2 ^ 0b10000000) {
list.consume(2);
return ((b2 & 0b01111111) << 7 * 1) + (b1 & 0b01111111);
}

if (length < 3) return -1;
const b3 = list.readUInt8(2);
if (b3 ^ 0b10000000) {
list.consume(3);
return ((b3 & 0b01111111) << 7 * 2) + ((b2 & 0b01111111) << 7 * 1) + (b1 & 0b01111111);
}

if (length < 4) return -1;
const b4 = list.readUInt8(3);
list.consume(4);
return ((b4 & 0b01111111) << 7 * 3) + ((b3 & 0b01111111) << 7 * 2) + ((b2 & 0b01111111) << 7 * 1) + (b1 & 0b01111111);
}

public parseVariableData () {
if (this.state !== DECODER_STATE.DATA) return;

this.state = DECODER_STATE.BYTE;
}
}
61 changes: 52 additions & 9 deletions src/__tests__/MqttDecoder.spec.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,59 @@
import {MqttDecoder} from '../MqttDecoder';
import { connect } from './util';
import {connect, connectAck, subscribe, subscribeAck} from './util';
import {PACKET_TYPE} from '../enums';

it('can instantiate', () => {
const decoder = new MqttDecoder(() => {});
const decoder = new MqttDecoder();
});

it('can parse header', () => {
const decoder = new MqttDecoder(() => {});
it('can parse first byte packet type of CONNECT packet', () => {
const decoder = new MqttDecoder();
decoder.push(connect);
expect(decoder.packet.t).toBe(0);
expect(decoder.packet.f).toBe(0);
decoder.parseHeader();
expect(decoder.packet.t).toBe(1);
expect(decoder.packet.f).toBe(0);
expect(decoder.packet.b).toBe(0);
expect(decoder.packet.l).toBe(0);
decoder.parseFirstByte();
expect(decoder.packet.b).not.toBe(0);
expect(decoder.packet.l).toBe(0);
expect(decoder.packet.type()).toBe(PACKET_TYPE.CONNECT);
});

it('can parse first byte packet type of CONNACK packet', () => {
const decoder = new MqttDecoder();
decoder.push(connectAck);
expect(decoder.packet.b).toBe(0);
expect(decoder.packet.l).toBe(0);
decoder.parseFirstByte();
expect(decoder.packet.b).not.toBe(0);
expect(decoder.packet.l).toBe(0);
expect(decoder.packet.type()).toBe(PACKET_TYPE.CONNACK);
});

it('can parse CONNECT packet fixed header', () => {
const decoder = new MqttDecoder();
decoder.push(connect);
const packet = decoder.parse();
expect(packet!.type()).toBe(PACKET_TYPE.CONNECT);
expect(packet!.dup()).toBe(false);
expect(packet!.qos()).toBe(0);
expect(packet!.retain()).toBe(false);
});

it('can parse CONNACK packet fixed header', () => {
const decoder = new MqttDecoder();
decoder.push(connectAck);
const packet = decoder.parse();
expect(packet!.type()).toBe(PACKET_TYPE.CONNACK);
expect(packet!.dup()).toBe(false);
expect(packet!.qos()).toBe(0);
expect(packet!.retain()).toBe(false);
});

it('can parse SUBACK packet fixed header', () => {
const decoder = new MqttDecoder();
decoder.push(subscribeAck);
const packet = decoder.parse();
expect(packet!.type()).toBe(PACKET_TYPE.SUBACK);
expect(packet!.dup()).toBe(false);
expect(packet!.qos()).toBe(0);
expect(packet!.retain()).toBe(false);
});
6 changes: 3 additions & 3 deletions src/enums.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export const enum DECODER_STATE {
HEADER = 0,
LENGTH = 1,
PAYLOAD = 2,
BYTE = 0,
LEN = 1,
DATA = 2,
}

export const enum PACKET_TYPE {
Expand Down
33 changes: 21 additions & 12 deletions src/packet.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
import {PACKET_TYPE} from './enums';

export interface IPacket {
/** Packet type. */
t: PACKET_TYPE;
/** Fixed header flags. */
f: number;
export interface MqttPacketHeaderData {
/** Packet first byte. */
b: number;
/** Variable length. */
l: number;
key: string | null;
dat: Buffer | null;
}

export class MqttPacket implements IPacket {
public t: PACKET_TYPE = PACKET_TYPE.RESERVED;
public f: number = 0;
export class MqttPacket implements MqttPacketHeaderData {
public b: number = 0;
public l: number = 0;
public key: string | null = null;
public dat: Buffer | null = null;

public type (): PACKET_TYPE {
return this.b >> 4;
}

public dup (): boolean {
return !!(this.b & 0b1000);
}

public qos (): 0 | 1 | 2 {
return ((this.b >> 1) & 0b11) as 0 | 1 | 2;
}

public retain (): boolean {
return !!(this.b & 0b1);
}
}

0 comments on commit c7af465

Please sign in to comment.