Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add types definitions #76

Merged
merged 11 commits into from
Mar 5, 2021
4 changes: 2 additions & 2 deletions abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ function abstractPersistence (opts) {
function testPacket (t, packet, expected) {
if (packet.messageId === null) packet.messageId = undefined
t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
t.deepEqual(packet, expected, 'must return the packet')
t.deepLooseEqual(packet, expected, 'must return the packet')
}

test('store and look up retained messages', function (t) {
Expand Down Expand Up @@ -1256,7 +1256,7 @@ function abstractPersistence (opts) {
delete retrieved.brokerId
delete packet.length

t.deepEqual(retrieved, packet, 'retrieved packet must be deeply equal')
t.deepLooseEqual(retrieved, packet, 'retrieved packet must be deeply equal')
t.notEqual(retrieved, packet, 'retrieved packet must not be the same objet')

instance.incomingDelPacket(client, retrieved, function (err) {
Expand Down
8 changes: 6 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
"version": "8.1.2",
"description": "The spec for an Aedes persistence, with abstract tests and a fast in-memory implementation.",
"main": "persistence.js",
"types": "types/index.d.ts",
"scripts": {
"lint": "standard --verbose | snazzy",
"lint-fix": "standard --fix",
"unit": "tape test.js | faucet",
"test": "npm run lint && npm run unit",
"test:types": "tsd",
"test": "npm run lint && npm run unit && tsd",
"coverage": "nyc --reporter=lcov tape test.js",
"test:ci": "npm run lint && npm run coverage",
"license-checker": "license-checker --production --onlyAllow='MIT;ISC;BSD-3-Clause;BSD-2-Clause'",
Expand Down Expand Up @@ -58,6 +60,7 @@
"node": ">=10"
},
"devDependencies": {
"aedes": "^0.45.0",
"concat-stream": "^2.0.0",
"faucet": "0.0.1",
"license-checker": "^25.0.1",
Expand All @@ -69,7 +72,8 @@
"snazzy": "^9.0.0",
"standard": "^15.0.1",
"tape": "^5.2.1",
"through2": "^4.0.2"
"through2": "^4.0.2",
"tsd": "^0.14.0"
},
"dependencies": {
"aedes-packet": "^2.3.1",
Expand Down
270 changes: 270 additions & 0 deletions types/index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
import type { Brokers, Client, Subscription } from 'aedes';
import type { AedesPacket } from 'aedes-packet';
import type { QoS } from 'mqtt-packet';
import type { Readable } from 'stream';

export type { AedesPacket as Packet } from 'aedes-packet';

export interface AedesPersistenceSubscription {
clientId: string;
topic: string;
qos?: QoS;
}
getlarge marked this conversation as resolved.
Show resolved Hide resolved

export type CallbackError = Error | null | undefined;

export type WillPacket = AedesPacket & { [key: string]: any };

interface Incoming {
[clientId: string]: { [messageId: string]: AedesPacket };
}

export interface AedesPersistence {
storeRetained: (
packet: AedesPacket,
cb: (error: CallbackError) => void
) => void;

createRetainedStream: (pattern: string) => Readable;

createRetainedStreamCombi: (patterns: string[]) => Readable;

addSubscriptions: (
client: Client,
subs: Subscription[],
cb: (error: CallbackError, client: Client) => void
) => void;

removeSubscriptions: (
client: Client,
subs: Subscription[],
cb: (error: CallbackError, client: Client) => void
) => void;

subscriptionsByClient: (
client: Client,
cb: (
error: CallbackError,
subs: { topic: string; qos: QoS }[],
client: Client
) => void
) => void;

countOffline: (
cb: (
error: CallbackError,
subscriptionsCount: number,
clientsCount: number
) => void
) => void;

subscriptionsByTopic: (
pattern: string,
cb: (error: CallbackError, subs: AedesPersistenceSubscription[]) => void
) => void;

cleanSubscriptions: (
client: Client,
cb: (error: CallbackError, client: Client) => void
) => void;

outgoingEnqueue: (
sub: { clientId: string },
packet: AedesPacket,
cb: (error: CallbackError) => void
) => void;

outgoingEnqueueCombi: (
subs: { clientId: string }[],
packet: AedesPacket,
cb: (error: CallbackError) => void
) => void;

outgoingUpdate: (
client: Client,
packet: AedesPacket,
cb: (error: CallbackError, client: Client, packet: AedesPacket) => void
) => void;

outgoingClearMessageId: (
client: Client,
packet: AedesPacket,
cb: (error?: CallbackError, packet?: AedesPacket) => void
) => void;

outgoingStream: (client: Client) => Readable;

incomingStorePacket: (
client: Client,
packet: AedesPacket,
cb: (error: CallbackError) => void
) => void;

incomingGetPacket: (
client: Client,
packet: AedesPacket,
cb: (error: CallbackError, packet: AedesPacket) => void
) => void;

incomingDelPacket: (
client: Client,
packet: AedesPacket,
cb: (error: CallbackError) => void
) => void;

putWill: (
client: Client,
packet: AedesPacket,
cb: (error: CallbackError, client: Client) => void
) => void;

getWill: (
client: Client,
cb: (error: CallbackError, will: WillPacket, client: Client) => void
) => void;

delWill: (
client: Client,
cb: (error: CallbackError, will: WillPacket, client: Client) => void
) => void;

streamWill: (brokers: Brokers) => Readable;

getClientList: (topic: string) => Readable;

destroy: (cb?: (error: CallbackError) => void) => void;
}

export class AedesMemoryPersistence implements AedesPersistence {
_retained: AedesPacket[];
_subscriptions: Map<
AedesPersistenceSubscription['clientId'],
Map<
AedesPersistenceSubscription['topic'],
AedesPersistenceSubscription['qos']
>
>;
_clientsCount: number;
_trie: any;
_outgoing: Record<string, AedesPacket[]>;
_incoming: Incoming;
_wills: Record<string, WillPacket>;

constructor();

storeRetained: (
packet: AedesPacket,
cb: (error: CallbackError) => void
) => void;

createRetainedStream: (pattern: string) => Readable;

createRetainedStreamCombi: (patterns: string[]) => Readable;

addSubscriptions: (
client: Client,
subs: Subscription[],
cb: (error: CallbackError, client: Client) => void
) => void;

removeSubscriptions: (
client: Client,
subs: Subscription[],
cb: (error: CallbackError, client: Client) => void
) => void;

subscriptionsByClient: (
client: Client,
cb: (
error: CallbackError,
subs: { topic: string; qos: QoS }[],
getlarge marked this conversation as resolved.
Show resolved Hide resolved
client: Client
) => void
) => void;

countOffline: (
cb: (
error: CallbackError,
subscriptionsCount: number,
clientsCount: number
) => void
) => void;

subscriptionsByTopic: (
pattern: string,
cb: (error: CallbackError, subs: AedesPersistenceSubscription[]) => void
) => void;

cleanSubscriptions: (
client: Client,
cb: (error: CallbackError, client: Client) => void
) => void;

outgoingEnqueue: (
sub: { clientId: string },
packet: AedesPacket,
cb: (error: CallbackError) => void
) => void;

outgoingEnqueueCombi: (
sub: { clientId: string }[],
packet: AedesPacket,
cb: (error: CallbackError) => void
) => void;

outgoingUpdate: (
client: Client,
packet: AedesPacket,
cb: (error: CallbackError, client: Client, packet: AedesPacket) => void
) => void;

outgoingClearMessageId: (
client: Client,
packet: AedesPacket,
cb: (error?: CallbackError, packet?: AedesPacket) => void
) => void;

outgoingStream: (client: Client) => Readable;

incomingStorePacket: (
client: Client,
packet: AedesPacket,
cb: (error: CallbackError) => void
) => void;

incomingGetPacket: (
client: Client,
packet: AedesPacket,
cb: (error: CallbackError, packet: AedesPacket) => void
) => void;

incomingDelPacket: (
client: Client,
packet: AedesPacket,
cb: (error: CallbackError) => void
) => void;

putWill: (
client: Client,
packet: AedesPacket,
cb: (error: CallbackError, client: Client) => void
) => void;

getWill: (
client: Client,
cb: (error: CallbackError, will: WillPacket, client: Client) => void
) => void;

delWill: (
client: Client,
cb: (error: CallbackError, will: WillPacket, client: Client) => void
) => void;

streamWill: (brokers: Brokers) => Readable;

getClientList: (topic: string) => Readable;

destroy: (cb?: (error: CallbackError) => void) => void;
}

export default function aedesMemoryPersistence(): AedesMemoryPersistence;
Loading