Skip to content

Commit

Permalink
fix(break): subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
izatop committed Nov 12, 2021
1 parent b392d4e commit c0d6dd0
Show file tree
Hide file tree
Showing 55 changed files with 871 additions and 707 deletions.
14 changes: 7 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
"license": "MIT",
"private": true,
"devDependencies": {
"@commitlint/cli": "^13.2.1",
"@commitlint/config-conventional": "^13.2.0",
"@commitlint/cli": "^14.1.0",
"@commitlint/config-conventional": "^14.1.0",
"@types/jest": "^27.0.2",
"@types/node": "^16.10.5",
"@typescript-eslint/eslint-plugin": "^5.0.0",
"@typescript-eslint/parser": "^5.0.0",
"@types/node": "^16.11.7",
"@typescript-eslint/eslint-plugin": "^5.3.1",
"@typescript-eslint/parser": "^5.3.1",
"cross-env": "^7.0.3",
"eslint": "^8.0.0",
"eslint-plugin-unused-imports": "^1.1.5",
"eslint": "^8.2.0",
"eslint-plugin-unused-imports": "^2.0.0",
"jest": "^27.2.5",
"rimraf": "^3.0.2",
"ts-jest": "^27.0.5",
Expand Down
9 changes: 3 additions & 6 deletions packages/cli/src/Commander.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {Application, IRoute} from "@bunt/app";
import {Action, ContextArg, Heartbeat, IDisposable, IRunnable} from "@bunt/unit";
import {Action, ContextArg, Heartbeat, IRunnable} from "@bunt/unit";
import {CommandContext} from "./Context/CommandContext";
import {RequestCommand} from "./Request";

export class Commander<C extends CommandContext> implements IRunnable, IDisposable {
export class Commander<C extends CommandContext> implements IRunnable {
readonly #application: Application<C>;

protected constructor(application: Application<C>) {
Expand All @@ -13,6 +13,7 @@ export class Commander<C extends CommandContext> implements IRunnable, IDisposab
public static async execute<C extends CommandContext>(
context: ContextArg<C>, routes: IRoute<Action<C, any, IRunnable>>[] = []): Promise<IRunnable | undefined> {
const command = new this<C>(await Application.factory<C>(context, routes));

return command.handle();
}

Expand All @@ -23,10 +24,6 @@ export class Commander<C extends CommandContext> implements IRunnable, IDisposab
return this.#application.run(request);
}

public async dispose(): Promise<void> {
return;
}

public getHeartbeat(): Heartbeat {
return Heartbeat.create(this);
}
Expand Down
7 changes: 1 addition & 6 deletions packages/cli/test/src/RunnableCommand.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {Disposable, dispose, isDisposable, isRunnable} from "@bunt/unit";
import {AsyncState} from "@bunt/util";
import {dispose, isDisposable, isRunnable} from "@bunt/unit";
import {ok} from "assert";
import {Commander} from "../../src";
import {RunnableTestCommand} from "./app/Action/RunnableTestCommand";
Expand All @@ -17,13 +16,9 @@ test("Runnable Command", async () => {

expect(heartbeat.beats).toBe(true);
expect(pending.then).not.toBeUndefined();
expect(AsyncState.has(pending)).toBe(true);
expect(AsyncState.isReleased(pending)).toBe(false);

expect(isDisposable(result)).toBe(true);
ok(isDisposable(result));

await dispose(result);
await expect(heartbeat.beats).toBe(false);
expect(Disposable.size).toBe(0);
});
9 changes: 4 additions & 5 deletions packages/project/resources/lint/.eslintrc.dist
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
}
},
"rules": {
"semi": "off",
"quotes": "off",
"no-console": "error",
"no-debugger": "error",
"semi": [
"error",
"always"
],
"@typescript-eslint/explicit-function-return-type": "off",
"@typescript-eslint/no-explicit-any": "off",
"@typescript-eslint/no-empty-interface": "warn",
"@typescript-eslint/interface-name-prefix": "off",
"@typescript-eslint/explicit-module-boundary": "off",
"@typescript-eslint/explicit-module-boundary-types": "off",
"@typescript-eslint/quotes": "error",
"@typescript-eslint/camelcase": "off",
"@typescript-eslint/member-delimiter-style": [
"error",
Expand Down
2 changes: 1 addition & 1 deletion packages/queue/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"dependencies": {
"@bunt/unit": "^0.21.0",
"@bunt/util": "^0.21.0",
"@types/ioredis": "^4.27.7",
"@types/ioredis": "^4.28.1",
"ioredis": "^4.27.11"
},
"license": "MIT"
Expand Down
18 changes: 9 additions & 9 deletions packages/queue/src/Dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {ActionCtor, Context, ContextArg, Disposable, Heartbeat, IDisposable, IRunnable, unit, Unit} from "@bunt/unit";
import {ActionCtor, Context, ContextArg, Disposer, Heartbeat, IRunnable, unit, Unit} from "@bunt/unit";
import {Ctor, logger, Logger} from "@bunt/util";
import {Handler} from "./Handler";
import {ITransport} from "./interfaces";
import {Incoming, MessageCtor, Queue, QueueAbstract} from "./Queue";

export class Dispatcher<C extends Context> implements IDisposable, IRunnable {
export class Dispatcher<C extends Context> extends Disposer implements IRunnable {
@logger
public logger!: Logger;

Expand All @@ -13,8 +13,11 @@ export class Dispatcher<C extends Context> implements IDisposable, IRunnable {
readonly #route = new Map<MessageCtor<any>, ActionCtor<C>>();

protected constructor(u: Unit<C>, queue: QueueAbstract<ITransport>) {
this.#queue = queue;
super();

this.#unit = u;
this.#queue = queue;
this.onDispose(queue);
}

public get size(): number {
Expand All @@ -23,6 +26,7 @@ export class Dispatcher<C extends Context> implements IDisposable, IRunnable {

public static async factory<C extends Context>(
context: ContextArg<C>, queue: Queue<ITransport>): Promise<Dispatcher<C>> {

return new this(await unit(context), queue);
}

Expand All @@ -35,13 +39,9 @@ export class Dispatcher<C extends Context> implements IDisposable, IRunnable {
this.#unit.add(action);
}

const subscription = this.#queue.subscribe<any>(type, ({payload}) => this.#unit.run(action, {payload}));
Disposable.attach(this, subscription);
const subscription = this.#queue.on<any>(type, ({payload}) => this.#unit.run(action, {payload}));
this.onDispose(subscription);

return this;
}

public async dispose(): Promise<void> {
return;
}
}
14 changes: 5 additions & 9 deletions packages/queue/src/PubSub/PubSubAbstract.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import {Disposable, IDisposable} from "@bunt/unit";
import {Disposer} from "@bunt/unit";
import {isArray} from "@bunt/util";
import {IPubSubTransport, PubSubChannel} from "./interfaces";
import {Subscription} from "./Subscription";

export abstract class PubSubAbstract<S extends Record<string, any>>
implements IDisposable {
export abstract class PubSubAbstract<S extends Record<string, any>> extends Disposer {
readonly #transport: IPubSubTransport;

public constructor(transport: IPubSubTransport) {
this.#transport = transport;
super();

Disposable.attach(this, transport);
this.#transport = transport;
this.onDispose(this.#transport);
}

public key<K extends keyof S>(channel: PubSubChannel<K>): string {
Expand All @@ -29,10 +29,6 @@ implements IDisposable {
);
}

public async dispose(): Promise<void> {
return;
}

protected abstract serialize<K extends keyof S>(message: S[K]): string;

protected abstract parse<K extends keyof S>(message: string): S[K];
Expand Down
121 changes: 33 additions & 88 deletions packages/queue/src/PubSub/Subscription.ts
Original file line number Diff line number Diff line change
@@ -1,114 +1,59 @@
import {AsyncCallback} from "@bunt/util";
import {ISubscriptionManager} from "./interfaces";
import {Disposable} from "@bunt/unit";
import {all} from "@bunt/util";
import {SubscriptionManager} from "./SubscriptionManager";
import {SubscriptionIterator} from "./SubscriptionIterator";

export class Subscription<T> implements AsyncIterable<T> {
export class Subscription<T> implements AsyncIterable<T>, Disposable {
public readonly channel: string;

readonly #parser: (message: string) => T;
readonly #manager: ISubscriptionManager;
readonly #subscriptions = new Set<AsyncCallback<T>>();
readonly #manager: SubscriptionManager;
readonly #subscriptions = new Set<SubscriptionIterator<T>>();

constructor(channel: string, manager: ISubscriptionManager, parser: (message: string) => T) {
constructor(channel: string, manager: SubscriptionManager, parser: (message: string) => T) {
this.channel = channel;
this.#manager = manager;
this.#parser = parser;
}

public async *subscribe(): AsyncGenerator<T> {
return this[Symbol.asyncIterator];
public get size() {
return this.#subscriptions.size;
}

public ensure(): Promise<void> {
return this.#manager.ensure(this.channel);
}

/**
* Unsubscribe all active subscriptions
*/
public async unsubscribe() {
const pending: Promise<void>[] = [];
for (const iterator of this.#subscriptions.values()) {
iterator.dispose();
pending.push(iterator.destroy());
}

await all(pending);
}

public async* [Symbol.asyncIterator](): AsyncGenerator<T> {
const generator = new SubscriptionGenerator<T>();
const id = await this.#manager.subscribe(
public [Symbol.asyncIterator](): SubscriptionIterator<T> {
const iterator = new SubscriptionIterator<T>();
const id = this.#manager.on(
this.channel,
(message) => generator.push(this.#parser(message)),
(message) => iterator.push(this.#parser(message)),
);

generator.stop = async () => {
await this.#manager.unsubscribe(id);
};

return generator;
}
}

class SubscriptionGenerator<T, TNext = unknown> implements AsyncGenerator<T, undefined, TNext> {
readonly #queue: Defer<T | undefined>[] = [];
#done = false;

public pull(): Defer<T | undefined> {
const pending = new Defer<T | undefined>();
this.#queue.push(pending);

return pending;
}

public push(value: T | undefined): void {
this.#queue.splice(0, this.#queue.length)
.forEach((pending) => pending.resolve(value));
}

public async next(): Promise<IteratorResult<T, undefined>> {
const value = await this.pull();
if (value) {
return {value};
}

return {value: undefined, done: true};
}

public async return(): Promise<IteratorResult<T, undefined>> {
this.close();

return {value: undefined, done: true};
}

public async throw(): Promise<IteratorResult<T, undefined>> {
this.close();

return {value: undefined, done: true};
}

public [Symbol.asyncIterator](): AsyncGenerator<T, undefined, TNext> {
return this;
}

public stop?(): unknown;

private async close() {
await this.stop?.();
this.push(undefined);
}
}

class Defer<T> extends Promise<T> {
#resolve = (_value: T | PromiseLike<T>) => {
// noop
};

#reject = (_error: Error) => {
// noop
};

constructor() {
super((resolve, reject) => {
this.#resolve = resolve;
this.#reject = reject;
iterator.unsubscribe(() => {
this.#manager.off(id);
this.#subscriptions.delete(iterator);
});
}

public resolve(value: T | PromiseLike<T>) {
this.#resolve(value);
this.#subscriptions.add(iterator);

return iterator;
}

public reject(error: Error) {
this.#reject(error);
public async dispose() {
await this.unsubscribe();
}
}
38 changes: 38 additions & 0 deletions packages/queue/src/PubSub/SubscriptionIterator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import {AsyncPushPull, isUndefined, Promisify} from "@bunt/util";

export class SubscriptionIterator<T, TNext = unknown> extends AsyncPushPull<T | undefined>
implements AsyncIterator<T, undefined, TNext> {
#unsubscribe?: () => Promisify<void>;

#done = false;

public async next(): Promise<IteratorResult<T, undefined>> {
const value = await this.pull();
if (isUndefined(value)) {
return {value, done: true};
}

return {value, done: false};
}

public async return(): Promise<IteratorResult<T, undefined>> {
this.destroy();

return {value: undefined, done: true};
}

public async throw(): Promise<IteratorResult<T, undefined>> {
this.destroy();

return {value: undefined, done: true};
}

public unsubscribe(fn: () => Promisify<void>): void {
this.#unsubscribe = fn;
}

public async destroy() {
await this.#unsubscribe?.();
super.destroy();
}
}
Loading

0 comments on commit c0d6dd0

Please sign in to comment.