Skip to content

Commit

Permalink
feat(middleware): implement MiddlewareSubscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
davidyuk committed Jun 5, 2023
1 parent 583a543 commit b51b0a3
Show file tree
Hide file tree
Showing 7 changed files with 444 additions and 9 deletions.
1 change: 1 addition & 0 deletions commitlint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module.exports = {
'channel',
'compiler',
'contract',
'middleware',
'deps',
'deps-dev',
'node',
Expand Down
53 changes: 46 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,24 @@
"@types/uuid": "^9.0.1",
"@types/webextension-polyfill": "^0.10.0",
"@types/websocket": "^1.0.5",
"@types/ws": "^8.5.4",
"aes-js": "^3.1.2",
"bignumber.js": "^9.1.1",
"bip32-path": "^0.4.2",
"blakejs": "^1.2.1",
"bs58": "^5.0.0",
"buffer": "^6.0.3",
"events": "^3.3.0",
"isomorphic-ws": "^5.0.0",
"json-bigint": "^1.0.0",
"process": "^0.11.10",
"rlp": "^3.0.0",
"sha.js": "^2.4.11",
"tweetnacl": "^1.0.3",
"tweetnacl-auth": "^1.0.1",
"varuint-bitcoin": "^1.1.2",
"websocket": "^1.0.34"
"websocket": "^1.0.34",
"ws": "^8.13.0"
},
"repository": {
"type": "git",
Expand Down
207 changes: 207 additions & 0 deletions src/MiddlewareSubscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/* eslint-disable max-classes-per-file */
import WebSocket from 'isomorphic-ws';
import { BaseError, UnexpectedTsError, InternalError } from './utils/errors';
import { Encoded } from './utils/encoder';

interface Message {
payload: Object;
source: Source.Middleware | Source.Node;
subscription: 'KeyBlocks' | 'MicroBlocks' | 'Transactions' | 'Object';
target?: string;
}

enum Source {
Middleware = 'mdw',
Node = 'node',
All = 'all',
}

export class MiddlewareSubscriberError extends BaseError {
constructor(message: string) {
super(message);
this.name = 'MiddlewareSubscriberError';
}
}

export class MiddlewareSubscriberDisconnected extends MiddlewareSubscriberError {
constructor(readonly closeEvent: WebSocket.CloseEvent) {
super('Connection closed');
this.name = 'MiddlewareSubscriberDisconnected';
}
}

export default class MiddlewareSubscriber {
#subscriptions: Array<
readonly [target: string, s: Source, cb: (p?: Object, e?: Error) => void]
> = [];

#requestQueue: Array<[isSubscribe: boolean, target: string]> = [];

#webSocket?: WebSocket;

get webSocket(): WebSocket | undefined {
return this.#webSocket;
}

get #targets(): Set<string> {
return new Set(this.#subscriptions.map(([target]) => target));
}

#sendMessage(message: any): void {
if (this.#webSocket == null) throw new UnexpectedTsError();
this.#webSocket.send(JSON.stringify(message));
}

#sendSubscribe(isSubscribe: boolean, target: string): void {
if (this.#webSocket == null) return;
const payload = ['KeyBlocks', 'MicroBlocks', 'Transactions'].includes(target)
? target : 'Object';
this.#sendMessage({
op: isSubscribe ? 'Subscribe' : 'Unsubscribe',
payload,
...payload === 'Object' && { target },
});
this.#requestQueue.push([isSubscribe, target]);
}

#emit(condition: (target: string, source: Source) => boolean, p?: Object, e?: Error): void {
this.#subscriptions
.filter(([target, source]) => condition(target, source))
.forEach(([, , cb]) => cb(p, e));
}

constructor(readonly url: string) {
}

#disconnect(onlyReset = false): void {
if (this.#webSocket == null) return;
if (!onlyReset) this.#webSocket.close();
Object.assign(this.#webSocket, {
onopen: undefined,
onerror: undefined,
onmessage: undefined,
});
this.#webSocket = undefined;
}

async reconnect(): Promise<void> {
this.#disconnect();
this.#webSocket = await new Promise((resolve) => {
const webSocket = new WebSocket(this.url);
Object.assign(webSocket, {
onopen: () => resolve(webSocket),
onerror: (errorEvent: WebSocket.ErrorEvent) => {
this.#emit(() => true, undefined, errorEvent.error);
},
onmessage: (event: WebSocket.MessageEvent) => {
if (typeof event.data !== 'string') {
throw new InternalError(`Unknown incoming message type: ${typeof event.data}`);
}
this.#messageHandler(JSON.parse(event.data));
},
onclose: (event: WebSocket.CloseEvent) => {
this.#emit(() => true, undefined, new MiddlewareSubscriberDisconnected(event));
this.#disconnect(true);
},
});
});
await Promise.all([...this.#targets].map((target) => this.#sendSubscribe(true, target)));
}

#messageHandler(message: string | string[] | Message): void {
if (typeof message === 'string' || Array.isArray(message)) {
const request = this.#requestQueue.shift();
if (request == null) throw new InternalError('Request queue is empty');
const [isSubscribe, target] = request;
let error;
if (typeof message === 'string') error = new MiddlewareSubscriberError(message);
if (message.includes(target) !== isSubscribe) {
error = new InternalError(`Expected ${target} to be${isSubscribe ? '' : ' not'} included into ${message}`);
}
if (error != null) this.#emit((t) => target === t, undefined, error);
return;
}
this.#emit(
(target, source) => (target === message.subscription || target === message.target)
&& (source === message.source || source === Source.All),
message.payload,
);
}

#subscribe(target: string, source: Source, cb: (p?: Object, e?: Error) => void): () => void {
const subscription = [target, source, cb] as const;
if (this.#targets.size === 0) this.reconnect();
if (!this.#targets.has(target)) this.#sendSubscribe(true, target);
this.#subscriptions.push(subscription);
return () => {
this.#subscriptions = this.#subscriptions.filter((item) => item !== subscription);
if (!this.#targets.has(target)) this.#sendSubscribe(false, target);
if (this.#targets.size === 0) this.#disconnect();
};
}

// TODO: replace p?: any with a proper type definition

subscribeKeyBlocks(cb: (p?: any, e?: Error) => void): () => void {
return this.#subscribe('KeyBlocks', Source.Middleware, cb);
}

subscribeKeyBlocksNode(cb: (p?: any, e?: Error) => void): () => void {
return this.#subscribe('KeyBlocks', Source.Node, cb);
}

subscribeKeyBlocksAll(cb: (p?: any, e?: Error) => void): () => void {
return this.#subscribe('KeyBlocks', Source.All, cb);
}

subscribeMicroBlocks(cb: (p?: any, e?: Error) => void): () => void {
return this.#subscribe('MicroBlocks', Source.Middleware, cb);
}

subscribeMicroBlocksNode(cb: (p?: any, e?: Error) => void): () => void {
return this.#subscribe('MicroBlocks', Source.Node, cb);
}

subscribeMicroBlocksAll(cb: (p?: any, e?: Error) => void): () => void {
return this.#subscribe('MicroBlocks', Source.All, cb);
}

subscribeTransactions(cb: (p?: any, e?: Error) => void): () => void {
return this.#subscribe('Transactions', Source.Middleware, cb);
}

subscribeTransactionsNode(cb: (p?: any, e?: Error) => void): () => void {
return this.#subscribe('Transactions', Source.Node, cb);
}

subscribeTransactionsAll(cb: (p?: any, e?: Error) => void): () => void {
return this.#subscribe('Transactions', Source.All, cb);
}

subscribeObject(
target: Encoded.KeyBlockHash | Encoded.Channel | Encoded.ContractAddress
| Encoded.OracleAddress | Encoded.OracleQueryId | Encoded.AccountAddress
| Encoded.Name | `${string}.chain`,
cb: (p?: any, e?: Error) => void,
): () => void {
return this.#subscribe(target, Source.Middleware, cb);
}

subscribeObjectNode(
target: Encoded.KeyBlockHash | Encoded.Channel | Encoded.ContractAddress
| Encoded.OracleAddress | Encoded.OracleQueryId | Encoded.AccountAddress
| Encoded.Name | `${string}.chain`,
cb: (p?: any, e?: Error) => void,
): () => void {
return this.#subscribe(target, Source.Node, cb);
}

subscribeObjectAll(
target: Encoded.KeyBlockHash | Encoded.Channel | Encoded.ContractAddress
| Encoded.OracleAddress | Encoded.OracleQueryId | Encoded.AccountAddress
| Encoded.Name | `${string}.chain`,
cb: (p?: any, e?: Error) => void,
): () => void {
return this.#subscribe(target, Source.All, cb);
}
}
2 changes: 1 addition & 1 deletion src/channel/Base.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EventEmitter } from 'events';
import EventEmitter from 'events';
import { w3cwebsocket as W3CWebSocket } from 'websocket';
import { snakeToPascal } from '../utils/string';
import { buildTx, unpackTx } from '../tx/builder';
Expand Down
5 changes: 5 additions & 0 deletions src/index-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ export { default as AccountLedgerFactory } from './account/LedgerFactory';
export { default as CompilerBase } from './contract/compiler/Base';
export { default as CompilerHttp } from './contract/compiler/Http';
export { default as Channel } from './channel/Contract';
export {
default as _MiddlewareSubscriber,
MiddlewareSubscriberError as _MiddlewareSubscriberError,
MiddlewareSubscriberDisconnected as _MiddlewareSubscriberDisconnected,
} from './MiddlewareSubscriber';

export { default as connectionProxy } from './aepp-wallet-communication/connection-proxy';
export * from './aepp-wallet-communication/schema';
Expand Down
Loading

0 comments on commit b51b0a3

Please sign in to comment.