Skip to content

Commit

Permalink
feat: 🎸 add subscribe packet encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Oct 6, 2020
1 parent 77816a6 commit 367867e
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 4 deletions.
151 changes: 151 additions & 0 deletions src/packets/__tests__/subscribe.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import {PACKET_TYPE, PROPERTY} from '../../enums';
import {PacketSubscribe, Subscription} from '../subscribe';
import {MqttDecoder} from '../../MqttDecoder';

test('can create a packet', () => {
const packet = PacketSubscribe.create(0, {});
expect(packet.b >> 4).toBe(PACKET_TYPE.SUBSCRIBE);
expect(packet.l).toBe(0);
expect(packet.i).toBe(0);
expect(packet.p).toEqual({});
expect(packet.s).toEqual([]);
});

test('can add a subscription', () => {
const packet = PacketSubscribe.create(0, {});
expect(packet.s).toEqual([]);
packet.addSubscription(Subscription.create('foo', 0, false, false, 0));
expect(packet.s).toEqual([
{t: 'foo', f: 0},
]);
packet.addSubscription(Subscription.create('bar', 0, false, false, 0));
expect(packet.s).toEqual([
{t: 'foo', f: 0},
{t: 'bar', f: 0},
]);
});

test('can serialize basic packet MQTT 5.0', () => {
const packet = PacketSubscribe.create(0, {});
packet.addSubscription(Subscription.create('topic', 0, false, false, 0));
expect(packet instanceof PacketSubscribe).toBe(true);
const buf = packet.toBuffer(5);
expect(buf).toEqual(Buffer.from([
0x80, // Header
0x0b, // Remaining length
0x00, 0x00, // Packet Identifier
0x00, // Properties
0x00, 0x05, // "topic" length
0x74, 0x6f, 0x70, 0x69, 0x63, // "topic"
0x00, // Subscription flags
]));
});

test('can serialize basic packet MQTT 3.1.1', () => {
const packet = PacketSubscribe.create(0, {});
packet.addSubscription(Subscription.create('topic', 0, false, false, 0));
expect(packet instanceof PacketSubscribe).toBe(true);
const buf = packet.toBuffer(4);
expect(buf).toEqual(Buffer.from([
0x80, // Header
0x0a, // Remaining length
0x00, 0x00, // Packet Identifier
0x00, 0x05, // "topic" length
0x74, 0x6f, 0x70, 0x69, 0x63, // "topic"
0x00, // Subscription flags
]));
});

test('can serialize a packet with two subscriptions', () => {
const packet = PacketSubscribe.create(0, {});
packet.addSubscription(Subscription.create('topic', 0, false, false, 0));
packet.addSubscription(Subscription.create('topid', 1, true, true, 1));
expect(packet instanceof PacketSubscribe).toBe(true);
const buf = packet.toBuffer(5);
expect(buf).toEqual(Buffer.from([
0x80, // Header
0x13, // Remaining length
0x00, 0x00, // Packet Identifier
0x00, // Properties
0x00, 0x05, // "topic" length
0x74, 0x6f, 0x70, 0x69, 0x63, // "topic"
0x00, // Subscription flags
0x00, 0x05, // "topid" length
0x74, 0x6f, 0x70, 0x69, 0x64, // "topid"
0x1d, // Subscription flags
]));
});

test('can serialize packet and deserialize it back', () => {
const packet1 = PacketSubscribe.create(1, {});
packet1.addSubscription(Subscription.create('a', 0, true, true, 0));
packet1.addSubscription(Subscription.create('b', 1, false, true, 2));
packet1.addSubscription(Subscription.create('c', 2, true, false, 1));
const decoder = new MqttDecoder();
decoder.version = 5;
decoder.push(packet1.toBuffer(5));
const packet2 = decoder.parse()! as PacketSubscribe;
expect(packet2).toEqual(packet1);
expect(packet2.i).toEqual(1);
expect(packet2.p).toEqual({});
expect(packet2.s.length).toEqual(3);
expect(packet2.s[1].noLocal()).toBe(false);
});

test('can serialize packet and deserialize it with props', () => {
const packet1 = PacketSubscribe.create(1, {
[PROPERTY.MaximumPacketSize]: 123,
});
const decoder = new MqttDecoder();
decoder.version = 5;
decoder.push(packet1.toBuffer(5));
const packet2 = decoder.parse()! as PacketSubscribe;
expect(packet2).toEqual(packet1);
expect(packet2.p).toEqual({
[PROPERTY.MaximumPacketSize]: 123,
});
});

describe('Subscription', () => {
test('can create a subscription', () => {
const sub = Subscription.create('topic', 0, false, false, 0);
expect(sub.t).toBe('topic');
expect(sub.f).toBe(0);
expect(sub.qualityOfService()).toBe(0);
expect(sub.noLocal()).toBe(false);
expect(sub.retainAsPublished()).toBe(false);
expect(sub.retainHandling()).toBe(0);
});

test('can set QoS', () => {
const sub1 = Subscription.create('topic', 0, false, false, 0);
expect(sub1.qualityOfService()).toBe(0);
const sub2 = Subscription.create('topic', 1, false, false, 0);
expect(sub2.qualityOfService()).toBe(1);
const sub3 = Subscription.create('topic', 2, false, false, 0);
expect(sub3.qualityOfService()).toBe(2);
});

test('can set noLocal', () => {
const sub1 = Subscription.create('topic', 0, false, false, 0);
expect(sub1.noLocal()).toBe(false);
const sub2 = Subscription.create('topic', 0, true, false, 0);
expect(sub2.noLocal()).toBe(true);
});

test('can set retainAsPublished', () => {
const sub1 = Subscription.create('topic', 0, false, true, 0);
expect(sub1.retainAsPublished()).toBe(true);
const sub2 = Subscription.create('topic', 0, true, false, 0);
expect(sub2.retainAsPublished()).toBe(false);
});

test('can set retainHandling', () => {
const sub1 = Subscription.create('topic', 0, false, false, 0);
expect(sub1.retainHandling()).toBe(0);
const sub2 = Subscription.create('topic', 0, false, false, 1);
expect(sub2.retainHandling()).toBe(1);
const sub3 = Subscription.create('topic', 0, false, false, 2);
expect(sub3.retainHandling()).toBe(2);
});
});
63 changes: 63 additions & 0 deletions src/packets/subscribe/encodeSubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import {PacketSubscribe} from '.';
import {genProps} from '../../util/genProps';

export const encodeSubscribe = (packet: PacketSubscribe, version: number): Buffer => {
const {b, i, p, s} = packet;
const isV5 = version === 5;

const props = isV5 ? genProps(p) : null;
const propsLength = props ? props.length : 0;

let remainingLength: number = 2 + propsLength;
for (const sub of s) remainingLength += 3 + Buffer.byteLength(sub.t);
packet.l = remainingLength;

const remainingLengthSize = remainingLength < 128 ? 1 : remainingLength < 16_384 ? 2 : remainingLength < 2_097_152 ? 3 : 4;
const bufferLength = 1 + remainingLengthSize + remainingLength;
const buf = Buffer.allocUnsafe(bufferLength);

buf.writeUInt8(b, 0);

let offset = 1;

switch (remainingLengthSize) {
case 1:
buf.writeUInt8(remainingLength, 1);
offset = 2;
break;
case 2:
buf.writeUInt16LE(((remainingLength & 0b011111110000000) << 1) | (0b10000000 | (remainingLength & 0b01111111)), 1);
offset = 3;
break;
case 3:
buf.writeUInt16LE(((0b100000000000000 | (remainingLength & 0b011111110000000)) << 1) | (0b10000000 | (remainingLength & 0b01111111)), 1);
buf.writeUInt8((remainingLength >> 14) & 0b01111111, 3);
offset = 4;
break;
case 4:
buf.writeUInt32LE((((((remainingLength >> 21) & 0b01111111) << 8) | (0b10000000 | ((remainingLength >> 14) & 0b01111111))) << 16) |
((0b100000000000000 | (remainingLength & 0b011111110000000)) << 1) | (0b10000000 | (remainingLength & 0b01111111)), 1);
offset = 5;
break;
}

buf.writeUInt16BE(i, offset);
offset += 2;

if (isV5) {
props!.copy(buf, offset);
offset += propsLength;
}

for (const {t, f} of s) {
const len = Buffer.byteLength(t);
buf.writeUInt16BE(len, offset);
offset += 2;
buf.write(t, offset);
offset += len;
buf.writeUInt8(f, offset);
offset += 1;
}

return buf;
}
31 changes: 27 additions & 4 deletions src/packets/subscribe.ts → src/packets/subscribe/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import {Packet, PacketHeaderData} from '../packet';
import {BufferLike, Properties} from '../types';
import {parseBinary} from '../util/parse';
import {parseProps} from '../util/parseProps';
import {PACKET_TYPE} from '../../enums';
import {Packet, PacketHeaderData} from '../../packet';
import {BufferLike, Properties, QoS} from '../../types';
import {parseBinary} from '../../util/parse';
import {parseProps} from '../../util/parseProps';
import {encodeSubscribe} from './encodeSubscribe';

export interface PacketSubscribeData extends PacketHeaderData {
/** Packet Identifier. */
Expand All @@ -20,6 +22,14 @@ export interface SubscriptionData {
}

export class PacketSubscribe extends Packet implements PacketSubscribeData {
/**
* @param i Packet Identifier
* @param p Properties
*/
static create(i: number, p: Properties) {
return new PacketSubscribe(PACKET_TYPE.SUBSCRIBE << 4, 0, i, p, []);
}

constructor(
b: number,
l: number,
Expand All @@ -29,9 +39,22 @@ export class PacketSubscribe extends Packet implements PacketSubscribeData {
) {
super(b, l);
}

public addSubscription(subscription: Subscription) {
this.s.push(subscription);
}

public toBuffer(version: number) {
return encodeSubscribe(this, version);
}
}

export class Subscription implements SubscriptionData {
static create(t: string, qualityOfService: QoS, noLocal: boolean, retainAsPublished: boolean, retainHandling: number): Subscription {
const flag = ((((((retainHandling & 0b11) << 1) | (retainAsPublished ? 1 : 0)) << 1) | (noLocal ? 1 : 0)) << 2) | (qualityOfService & 0b11);
return new Subscription(t, flag);
}

constructor (public t: string, public f: number) {}

public qualityOfService(): number {
Expand Down

0 comments on commit 367867e

Please sign in to comment.