Skip to content

Commit

Permalink
feat: queue
Browse files Browse the repository at this point in the history
  • Loading branch information
izatop committed Nov 25, 2020
1 parent f6a91d1 commit 6c4201b
Show file tree
Hide file tree
Showing 34 changed files with 648 additions and 14 deletions.
16 changes: 8 additions & 8 deletions packages/app/src/Route/Route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ export class Route<A extends RouteAction = RouteAction> implements IRoute<A>, IL
);
}

private getRuleArgs(rule: string | RouteRule<A>): { route: string, payload?: Payload<A> } {
if (isString(rule)) {
return {route: rule, payload: undefined};
}

return {route: rule.route, payload: rule};
}

public getLogValue(): { route: string } {
return {route: this.route};
}
Expand All @@ -45,4 +37,12 @@ export class Route<A extends RouteAction = RouteAction> implements IRoute<A>, IL
public match(route: string): Record<string, string> {
return this.#matcher.match(route);
}

private getRuleArgs(rule: string | RouteRule<A>): { route: string, payload?: Payload<A> } {
if (isString(rule)) {
return {route: rule, payload: undefined};
}

return {route: rule.route, payload: rule};
}
}
34 changes: 34 additions & 0 deletions packages/queue/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"name": "@typesafeunit/queue",
"version": "0.1.0",
"keywords": [
"typescript"
],
"author": {
"name": "Artur Bier",
"email": "[email protected]"
},
"main": "dist/index.js",
"types": "dist/index.d.ts",
"files": [
"dist/"
],
"description": "Queue",
"repository": "[email protected]:izatop/typesafeunit.git",
"scripts": {
"clean": "yarn build:clean",
"watch": "yarn build:watch",
"build": "yarn build:clean && tsc",
"build:clean": "rimraf dist tsconfig.tsbuildinfo",
"build:watch": "yarn build --watch"
},
"publishConfig": {
"access": "public"
},
"dependencies": {
"@types/ioredis": "^4.17.8",
"@typesafeunit/util": "^0.10.11",
"ioredis": "^4.19.2"
},
"license": "MIT"
}
19 changes: 19 additions & 0 deletions packages/queue/src/Queue/ChannelAbstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import {IChannel, ITransaction, Message, MessageCtor} from "./interfaces";

export abstract class ChannelAbstract<M extends Message> implements IChannel<M> {
public abstract readonly subscribed: boolean;

public readonly type: MessageCtor<M>;

constructor(type: MessageCtor<M>) {
this.type = type;
}

public get key(): string {
return this.type.channel;
}

public abstract subscribe(): AsyncGenerator<ITransaction<M>>;

public abstract unsubscribe(): void;
}
17 changes: 17 additions & 0 deletions packages/queue/src/Queue/MessageAbstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export class MessageAbstract<T> {
public id?: string;

public readonly payload: T;

constructor(payload: T) {
this.payload = payload;
}

public get channel(): string {
return this.id ?? this.constructor.name;
}

public static get channel(): string {
return this.prototype.channel;
}
}
23 changes: 23 additions & 0 deletions packages/queue/src/Queue/QueueAbstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import {IMessageHandler, ITransport, Message, MessageCtor} from "./interfaces";
import {Subscription} from "./Subscription";

export class QueueAbstract<Q extends ITransport> {
readonly #transport: Q;

constructor(transport: Q) {
this.#transport = transport;
}

public get transport(): Q {
return this.#transport;
}

public async emit<M extends Message>(message: M): Promise<void> {
await this.#transport.send(message);
}

public subscribe<M extends Message>(type: MessageCtor<M>,
handler: IMessageHandler<M>): Subscription<M> {
return new Subscription<M>(this.#transport.channel(type), handler);
}
}
6 changes: 6 additions & 0 deletions packages/queue/src/Queue/SimpleQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {ITransport} from "./interfaces";
import {QueueAbstract} from "./QueueAbstract";

export class SimpleQueue<Q extends ITransport> extends QueueAbstract<Q> {

}
46 changes: 46 additions & 0 deletions packages/queue/src/Queue/Subscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import {assert} from "@typesafeunit/util";
import {HandleResult, IChannel, IMessageHandler, ISubscription} from "./interfaces";
import {MessageAbstract} from "./MessageAbstract";

export class Subscription<M extends MessageAbstract<any>> implements ISubscription<M> {
readonly #channel: IChannel<M>;
readonly #handler: IMessageHandler<M>;
readonly #state: Promise<void>;
readonly #listeners: ((result: HandleResult<M>) => unknown)[] = [];

constructor(channel: IChannel<M>, handler: IMessageHandler<M>) {
this.#channel = channel;
this.#handler = handler;
this.#state = this.run();
}

public async run(): Promise<void> {
for await (const transaction of this.#channel.subscribe()) {
try {
const {message} = transaction;
await this.#handler(message);
this.logResult(transaction.commit());
} catch (error) {
this.logResult(transaction.rollback(error));
}
}
}

public async unsubscribe(): Promise<void> {
assert(this.#channel.subscribed, "Already unsubscribed");
await this.#channel.unsubscribe();
await this.#state;
}

public listenResult(fn: (result: HandleResult<M>) => unknown): () => void {
this.#listeners.push(fn);
return () => {
this.#listeners.splice(this.#listeners.indexOf(fn), 1);
};
}

private async logResult(pending: Promise<HandleResult<M>>) {
const result = await pending;
this.#listeners.forEach((fn) => fn(result));
}
}
41 changes: 41 additions & 0 deletions packages/queue/src/Queue/TransactionAbstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import {isFunction} from "@typesafeunit/util";
import {createHandleState} from "./fn";
import {HandleResult, IHandleFail, IHandleResultFactory, ITransaction, Message} from "./interfaces";

export abstract class TransactionAbstract<M extends Message> implements ITransaction<M> {
readonly #message: M;
readonly #commit?: (message: M) => void;
readonly #rollback?: (message: M) => void;
readonly #handle: IHandleResultFactory<M>;

constructor(message: M, commit?: (message: M) => void, rollback?: (message: M) => void) {
this.#message = message;
this.#commit = commit;
this.#rollback = rollback;
this.#handle = createHandleState(message);
}

public get message(): M {
return this.#message;
}

public async commit(): Promise<HandleResult<M>> {
try {
if (isFunction(this.#commit)) {
await this.#commit(this.message);
}

return this.#handle();
} catch (error) {
return this.#handle(error);
}
}

public async rollback(reason?: Error): Promise<IHandleFail<M>> {
if (isFunction(this.#rollback)) {
this.#rollback(this.message);
}

return this.#handle(reason ?? new Error("Unknown reason"));
}
}
45 changes: 45 additions & 0 deletions packages/queue/src/Queue/fn.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import {assert} from "@typesafeunit/util";
import * as crypto from "crypto";
import {IHandleResultFactory, Message} from "./interfaces";

const serializeRe = /^[0-9a-f]{8}:.+$/;

export function serialize<M extends Message>(message: M): string {
const body = JSON.stringify(message.payload);
const signature = crypto.createHash("sha1")
.update(body)
.digest("hex")
.substr(0, 8);

return `${signature}:${body}`;
}

export function unserialize<T = unknown>(message: string): T {
assert(serializeRe.test(message), "Wrong message format");

const body = message.substr(9);
const signature = message.substr(0, 8);
const compareSignature = crypto.createHash("sha1")
.update(body)
.digest("hex")
.substr(0, 8);

assert(signature === compareSignature, "Wrong checksum");
return JSON.parse(body);
}

export function createHandleState<M extends Message>(message: M): IHandleResultFactory<M> {
const runAt = new Date();
return ((error?: Error) => {
if (error) {
return {runAt, error, message, status: false, finishAt: new Date()};
}

return {
runAt,
message,
finishAt: new Date(),
status: true,
};
}) as IHandleResultFactory<M>;
}
8 changes: 8 additions & 0 deletions packages/queue/src/Queue/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export * from "./fn";
export * from "./interfaces";
export * from "./QueueAbstract";
export * from "./ChannelAbstract";
export * from "./TransactionAbstract";
export * from "./MessageAbstract";
export * from "./Subscription";
export * from "./SimpleQueue";
76 changes: 76 additions & 0 deletions packages/queue/src/Queue/interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import {Promisify} from "@typesafeunit/util";
import {MessageAbstract} from "./MessageAbstract";

export interface ITransaction<M extends Message> {
message: M;

commit(): Promise<HandleResult<M>>;

rollback(reason?: Error): Promise<IHandleFail<M>>;
}

export interface IChannel<M extends MessageAbstract<any>> {
readonly type: MessageCtor<M>;
readonly subscribed: boolean;
readonly key: string;

subscribe(): AsyncGenerator<ITransaction<M>>;

unsubscribe(): void;
}

export interface ITransport {
send<M extends Message>(message: M): Promisify<void>;

channel<M extends Message>(type: MessageCtor<M>): IChannel<M>;
}

export interface IMessageHandler<M extends MessageAbstract<any>> {
(message: M): unknown;
}

export type QueueKeys<T> = Extract<keyof T, string>;

export interface IHandleState<M extends Message> {
error?: Error;
status: boolean;
message: M;
finishAt: Date;
runAt: Date;
}

export interface IHandleSuccess<M extends Message> extends IHandleState<M> {
status: true;
error?: never;
}

export interface IHandleFail<M extends Message> extends IHandleState<M> {
status: false;
error: Error;
}

export type HandleResult<M extends Message> = IHandleSuccess<M> | IHandleFail<M>;

export interface IHandleResultFactory<M extends Message> {
(): IHandleSuccess<M>;

(error: Error): IHandleFail<M>;
}

export interface ISubscription<M extends Message> {
unsubscribe(): Promise<void>;

listenResult(fn: (result: HandleResult<M>) => unknown): void;
}

export type MessagePayload<T extends MessageAbstract<any>> = T extends MessageAbstract<infer P> ? P : never;

export interface MessageCtor<M extends MessageAbstract<any>> {
prototype: M;

readonly channel: string;

new(message: MessagePayload<M>): M;
}

export type Message<T extends MessageAbstract<any> = MessageAbstract<any>> = T;
40 changes: 40 additions & 0 deletions packages/queue/src/Redis/RedisChannel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import {Redis} from "ioredis";
import {ChannelAbstract, Message, MessageCtor, unserialize} from "../Queue";
import {RedisTransaction} from "./RedisTransaction";

export class RedisChannel<M extends Message> extends ChannelAbstract<M> {
readonly #connection: Redis;

#done = false;
#pending?: Promise<[string, string]>;

constructor(type: MessageCtor<M>, connection: Redis) {
super(type);
this.#connection = connection;
}

public get subscribed(): boolean {
return !this.#done;
}

public async* subscribe(): AsyncGenerator<RedisTransaction<M>> {
this.#connection.connect();
while (!this.#done) {
this.#pending = this.#connection.blpop(this.key, 100);
const response = await this.#pending;
if (response && response.length === 2) {
try {
const [, message] = response;
yield new RedisTransaction(new this.type(unserialize(message)));
} catch (error) {

}
}
}
}

public async unsubscribe(): Promise<void> {
this.#done = true;
await this.#pending;
}
}
5 changes: 5 additions & 0 deletions packages/queue/src/Redis/RedisTransaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {Message, TransactionAbstract} from "../Queue";

export class RedisTransaction<M extends Message> extends TransactionAbstract<M> {

}
Loading

0 comments on commit 6c4201b

Please sign in to comment.