Skip to content

Commit

Permalink
feat: queue
Browse files Browse the repository at this point in the history
  • Loading branch information
izatop committed Dec 2, 2020
1 parent 6c4201b commit 51471ce
Show file tree
Hide file tree
Showing 50 changed files with 821 additions and 392 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
"@commitlint/config-conventional": "^11.0.0",
"@types/jest": "^26.0.15",
"@types/node": "^14.14.10",
"@typescript-eslint/eslint-plugin": "^4.8.2",
"@typescript-eslint/parser": "^4.8.2",
"@typescript-eslint/eslint-plugin": "^4.9.0",
"@typescript-eslint/parser": "^4.9.0",
"cross-env": "^7.0.2",
"eslint": "^7.14.0",
"husky": "^4.3.0",
Expand Down
19 changes: 0 additions & 19 deletions packages/queue/src/Queue/ChannelAbstract.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
export class MessageAbstract<T> {
public id?: string;

export abstract class MessageAbstract<T> {
public readonly payload: T;

constructor(payload: T) {
this.payload = payload;
}

public get channel(): string {
return this.id ?? this.constructor.name;
return this.constructor.name;
}

public static get channel(): string {
Expand Down
5 changes: 5 additions & 0 deletions packages/queue/src/Queue/Message/TaskAbstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {MessageAbstract} from "./MessageAbstract";

export abstract class TaskAbstract<T, R> extends MessageAbstract<T> {
public abstract reply(reply: R): MessageAbstract<R>;
}
12 changes: 12 additions & 0 deletions packages/queue/src/Queue/Message/TransactionAbstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import {hostname} from "os";
import {MessageAbstract} from "./MessageAbstract";

export abstract class TransactionAbstract<T> extends MessageAbstract<T> {
public static getBackupKey(): string {
return `${this.channel}:backup:${hostname()}`;
}

public static getFallbackKey(): string {
return `${this.channel}:fallback`;
}
}
3 changes: 3 additions & 0 deletions packages/queue/src/Queue/Message/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from "./MessageAbstract";
export * from "./TransactionAbstract";
export * from "./TaskAbstract";
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {ITransport} from "./interfaces";
import {QueueAbstract} from "./QueueAbstract";

export class SimpleQueue<Q extends ITransport> extends QueueAbstract<Q> {
export class Queue<Q extends ITransport> extends QueueAbstract<Q> {

}
19 changes: 9 additions & 10 deletions packages/queue/src/Queue/QueueAbstract.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
import {IMessageHandler, ITransport, Message, MessageCtor} from "./interfaces";
import {Subscription} from "./Subscription";
import {IDisposable} from "@typesafeunit/unit";
import {ISubscription, ITransport, Message, MessageCtor, MessageHandler, Task} from "./interfaces";

export class QueueAbstract<Q extends ITransport> {
export abstract class QueueAbstract<Q extends ITransport> implements IDisposable {
readonly #transport: Q;

constructor(transport: Q) {
this.#transport = transport;
}

public get transport(): Q {
return this.#transport;
public async send<M extends Message>(message: M): Promise<void> {
await this.#transport.send(message);
}

public async emit<M extends Message>(message: M): Promise<void> {
await this.#transport.send(message);
public subscribe<M extends Message | Task>(type: MessageCtor<M>, handler: MessageHandler<M>): ISubscription<M> {
return this.#transport.subscribe(type, handler);
}

public subscribe<M extends Message>(type: MessageCtor<M>,
handler: IMessageHandler<M>): Subscription<M> {
return new Subscription<M>(this.#transport.channel(type), handler);
public async dispose(): Promise<IDisposable> {
return this.#transport;
}
}
42 changes: 42 additions & 0 deletions packages/queue/src/Queue/ReadOperation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import {createReleaseState} from "./fn";
import {IHandleReleaseFactory, IReadOperation, IReadOperationFail, Message, OperationReleaseState} from "./interfaces";

export class ReadOperation<M extends Message> implements IReadOperation<M> {
readonly #message: M;
readonly #release: IHandleReleaseFactory<M>;

constructor(message: M) {
this.#message = message;
this.#release = createReleaseState(message);
}

public get channel(): string {
return this.#message.channel;
}

public get message(): M {
return this.#message;
}

public async commit(): Promise<OperationReleaseState<M>> {
try {
return this.release();
} catch (error) {
return this.release(error);
}
}

public async rollback(reason?: Error): Promise<IReadOperationFail<M>> {
return this.release(reason ?? new Error("Unknown reason"));
}

protected release(): OperationReleaseState<M>;
protected release(reason: Error): IReadOperationFail<M>;
protected release(reason?: Error): OperationReleaseState<M> {
if (reason) {
return this.#release(reason);
}

return this.#release();
}
}
35 changes: 35 additions & 0 deletions packages/queue/src/Queue/ReaderAbstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import {tryUnserialize} from "./fn";
import {IQueueReader, IReadOperation, Message, MessageCtor, MessagePayload} from "./interfaces";

export abstract class ReaderAbstract<M extends Message,
MC extends MessageCtor<M>,
RO extends IReadOperation<M>> implements IQueueReader<M, RO> {
readonly #type: MC;

constructor(type: MC) {
this.#type = type;
}

public get channel(): string {
return this.#type.channel;
}

protected get type(): MC {
return this.#type;
}

public abstract cancel(): Promise<void>;

public abstract dispose(): Promise<void>;

public async read(): Promise<RO | undefined> {
const message = tryUnserialize<MessagePayload<M>>(await this.next());
if (message) {
return this.createReadOperation(new this.#type(message));
}
}

protected abstract next(): Promise<string | undefined>;

protected abstract createReadOperation(message: M): RO;
}
46 changes: 3 additions & 43 deletions packages/queue/src/Queue/Subscription.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,6 @@
import {assert} from "@typesafeunit/util";
import {HandleResult, IChannel, IMessageHandler, ISubscription} from "./interfaces";
import {MessageAbstract} from "./MessageAbstract";
import {Message} from "./interfaces";
import {SubscriptionAbstract} from "./SubscriptionAbstract";

export class Subscription<M extends MessageAbstract<any>> implements ISubscription<M> {
readonly #channel: IChannel<M>;
readonly #handler: IMessageHandler<M>;
readonly #state: Promise<void>;
readonly #listeners: ((result: HandleResult<M>) => unknown)[] = [];
export class Subscription<M extends Message> extends SubscriptionAbstract<M> {

constructor(channel: IChannel<M>, handler: IMessageHandler<M>) {
this.#channel = channel;
this.#handler = handler;
this.#state = this.run();
}

public async run(): Promise<void> {
for await (const transaction of this.#channel.subscribe()) {
try {
const {message} = transaction;
await this.#handler(message);
this.logResult(transaction.commit());
} catch (error) {
this.logResult(transaction.rollback(error));
}
}
}

public async unsubscribe(): Promise<void> {
assert(this.#channel.subscribed, "Already unsubscribed");
await this.#channel.unsubscribe();
await this.#state;
}

public listenResult(fn: (result: HandleResult<M>) => unknown): () => void {
this.#listeners.push(fn);
return () => {
this.#listeners.splice(this.#listeners.indexOf(fn), 1);
};
}

private async logResult(pending: Promise<HandleResult<M>>) {
const result = await pending;
this.#listeners.forEach((fn) => fn(result));
}
}
90 changes: 90 additions & 0 deletions packages/queue/src/Queue/SubscriptionAbstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import {IDisposable} from "@typesafeunit/unit";
import {assert, isDefined, isInstanceOf} from "@typesafeunit/util";
import {
IMessageHandler,
IQueueReader,
IReadOperation,
ISubscription,
ISubscriptionResultHandler,
ITransport,
Message,
MessageCtor,
OperationReleaseState,
} from "./interfaces";
import {TaskAbstract} from "./Message";

export abstract class SubscriptionAbstract<M extends Message> implements ISubscription<M> {
readonly #type: MessageCtor<M>;
readonly #reader: IQueueReader<M>;
readonly #transport: ITransport;
readonly #handler: IMessageHandler<M>;
readonly #watchers: ISubscriptionResultHandler<M>[] = [];

#subscribed = true;
#state?: Promise<void>;

constructor(transport: ITransport, type: MessageCtor<M>, handler: IMessageHandler<M>) {
this.#type = type;
this.#reader = transport.reader(type);
this.#handler = handler;
this.#transport = transport;
this.#state = this.listen();
}

public get subscribed(): boolean {
return this.#subscribed;
}

public async subscribe(): Promise<void> {
assert(!this.subscribed, `The ${this.#type.channel} channel already subscribed`);
this.#subscribed = true;
this.#state = this.listen();
}

public async unsubscribe(): Promise<void> {
this.#subscribed = false;
this.#reader.cancel();
await this.#state;
}

public watch(fn: ISubscriptionResultHandler<M>): () => void {
this.#watchers.push(fn);
return () => {
this.#watchers.splice(this.#watchers.indexOf(fn), 1);
};
}

public async dispose(): Promise<IDisposable> {
await this.unsubscribe();
this.#watchers.splice(0, this.#watchers.length);
return this.#reader;
}

protected async listen(): Promise<void> {
while (this.#subscribed) {
const readOperation = await this.#reader.read();
if (readOperation) {
await this.handle(readOperation);
}
}
}

protected async handle(operation: IReadOperation<M>): Promise<void> {
const {message} = operation;
try {
const reply = await this.#handler(message);
if (isDefined(reply) && isInstanceOf(message, TaskAbstract)) {
await this.#transport.send(message.reply(reply));
}

await this.fire(operation.commit());
} catch (error) {
await this.fire(operation.rollback(error));
}
}

private async fire(operation: Promise<OperationReleaseState<M>>) {
const value = await operation;
this.#watchers.forEach(async (fn) => fn(value));
}
}
41 changes: 0 additions & 41 deletions packages/queue/src/Queue/TransactionAbstract.ts

This file was deleted.

Loading

0 comments on commit 51471ce

Please sign in to comment.