Skip to content

Commit

Permalink
fix: update deps, pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
izatop committed Jan 22, 2021
1 parent 9a2906b commit 5dcff6f
Show file tree
Hide file tree
Showing 36 changed files with 359 additions and 3,647 deletions.
24 changes: 0 additions & 24 deletions lerna.json

This file was deleted.

7 changes: 2 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@
"eslint": "^7.18.0",
"husky": "^4.3.8",
"jest": "^26.6.3",
"lerna": "^3.22.1",
"rimraf": "^3.0.2",
"ts-jest": "^26.4.4",
"ts-node": "^9.1.1",
"typescript": "^4.1.3"
},
"resolutions": {
"minimist": "1.2.5"
},
"dependencies": {
"eslint-plugin-react": "^7.22.0"
}
"dependencies": {}
}
3 changes: 1 addition & 2 deletions packages/project/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
"@bunt/app": "^0.14.4",
"@bunt/cli": "^0.14.4",
"@bunt/unit": "^0.14.4",
"@bunt/util": "^0.14.4",
"path-to-regexp": "^6.2.0"
"@bunt/util": "^0.14.4"
},
"license": "MIT"
}
File renamed without changes.
5 changes: 5 additions & 0 deletions packages/project/src/Resource/ResourceNotFound.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export class ResourceNotFound extends Error {
constructor(file: string) {
super(`Resource ${file} not found`);
}
}
11 changes: 9 additions & 2 deletions packages/project/src/Resource/ResourceStore.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {promises} from "fs";
import {basename, join} from "path";
import {Resource} from "./Resource";
import {ResourceNotFound} from "./ResourceNotFound";

const {stat, copyFile} = promises;

Expand Down Expand Up @@ -32,8 +33,14 @@ export class ResourceStore {

protected async resolve(file: string): Promise<string> {
const path = join(this.location, file);
await stat(path);
const paths = [path, path.concat(".dist")];
for (const variant of paths) {
const resolved = await stat(variant).catch(() => null);
if (resolved) {
return variant;
}
}

return path;
throw new ResourceNotFound(file);
}
}
1 change: 1 addition & 0 deletions packages/project/src/Resource/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from "./Resource";
export * from "./ResourceStore";
export * from "./ResourceNotFound";
2 changes: 1 addition & 1 deletion packages/queue/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"dependencies": {
"@bunt/unit": "^0.14.4",
"@bunt/util": "^0.14.4",
"@types/ioredis": "^4.19.1",
"@types/ioredis": "^4.19.2",
"ioredis": "^4.19.4"
},
"license": "MIT"
Expand Down
4 changes: 2 additions & 2 deletions packages/queue/src/Dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import {
Unit,
} from "@bunt/unit";
import {logger, Logger} from "@bunt/util";
import {ActionHandler} from "./interfaces";
import {ITransport, Message, MessageCtor, MessageHandler, Queue, QueueAbstract} from "./Queue";
import {ActionHandler, ITransport} from "./interfaces";
import {Message, MessageCtor, MessageHandler, Queue, QueueAbstract} from "./Queue";

export class Dispatcher<C extends IContext> implements IDisposable, IRunnable {
@logger
Expand Down
53 changes: 53 additions & 0 deletions packages/queue/src/PubSub/PubSubAbstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import {Disposable, IDisposable} from "@bunt/unit";
import {AsyncCallback, Fn, Promisify} from "@bunt/util";
import {IPubSubTransport, ISubscriber} from "./interfaces";

export abstract class PubSubAbstract<S extends Record<string, any>, T extends IPubSubTransport>
implements IDisposable {
readonly #transport: T;
readonly #subscriptions = new Map<string, ISubscriber>();
readonly #iterables = new Set<AsyncCallback<any>>();

public constructor(transport: T) {
this.#transport = transport;
}

public async publish<K extends keyof S>(channel: Extract<K, string>, message: S[K]): Promise<void> {
await this.#transport.publish(channel, this.serialize(message));
}

public async subscribe<K extends keyof S>(channel: Extract<K, string>): Promise<AsyncIterable<S[K]>>;
public async subscribe<K extends keyof S>(channel: Extract<K, string>,
listener: Fn<[S[K]], unknown>): Promise<() => void>;
public async subscribe<K extends keyof S>(
channel: Extract<K, string>, listener?: Fn<[S[K]], unknown>): Promise<AsyncIterable<S[K]> | (() => void)> {
const subscription = this.#subscriptions.get(channel) ?? await this.#transport.subscribe(channel);
if (!this.#subscriptions.has(channel)) {
await subscription.subscribe();
this.#subscriptions.set(channel, subscription);
}

if (listener) {
return subscription.listen((message) => listener(this.parse(message)));
}

const iterable = new AsyncCallback<S[K]>((emit) => {
return subscription.listen((message) => emit(this.parse(message)));
});

this.#iterables.add(iterable);
return iterable;
}

public dispose(): Promisify<Disposable | Disposable[] | void> {
for (const iterable of this.#iterables.values()) {
iterable.dispose();
}

return this.#transport.dispose();
}

protected abstract serialize<K extends keyof S>(message: S[K]): string;

protected abstract parse<K extends keyof S>(message: string): S[K];
}
12 changes: 12 additions & 0 deletions packages/queue/src/PubSub/PubSubSimple.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import {IPubSubTransport} from "./interfaces";
import {PubSubAbstract} from "./PubSubAbstract";

export class PubSubSimple<S extends Record<string, any>, T extends IPubSubTransport> extends PubSubAbstract<S, T> {
protected serialize<K extends keyof S>(message: S[K]): string {
return JSON.stringify(message);
}

protected parse<K extends keyof S>(message: string): S[K] {
return JSON.parse(message);
}
}
19 changes: 19 additions & 0 deletions packages/queue/src/PubSub/interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import {Fn} from "@bunt/util";
import {ITransport} from "../interfaces";

export interface ISubscriber {
readonly channel: string;

unsubscribe(): Promise<void>;
subscribe(): Promise<void>;

listen(listener: Fn<[string], unknown>): Fn;

close(): void;
}

export interface IPubSubTransport extends ITransport {
publish(channel: string, message: string): Promise<number>;

subscribe(channel: string): Promise<ISubscriber>;
}
2 changes: 1 addition & 1 deletion packages/queue/src/Queue/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {ITransport} from "./interfaces";
import {ITransport} from "../interfaces";
import {QueueAbstract} from "./QueueAbstract";

export class Queue<Q extends ITransport> extends QueueAbstract<Q> {
Expand Down
7 changes: 4 additions & 3 deletions packages/queue/src/Queue/QueueAbstract.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {IDisposable} from "@bunt/unit";
import {ISubscription, ITransport, Message, MessageCtor, MessageHandler, Task} from "./interfaces";
import {ITransport} from "../interfaces";
import {IQueueList, Message, MessageCtor, MessageHandler, Task} from "./interfaces";

export abstract class QueueAbstract<Q extends ITransport> implements IDisposable {
readonly #transport: Q;
Expand All @@ -12,8 +13,8 @@ export abstract class QueueAbstract<Q extends ITransport> implements IDisposable
await this.#transport.send(message);
}

public subscribe<M extends Message | Task>(type: MessageCtor<M>, handler: MessageHandler<M>): ISubscription<M> {
return this.#transport.subscribe(type, handler);
public subscribe<M extends Message | Task>(type: MessageCtor<M>, handler: MessageHandler<M>): IQueueList<M> {
return this.#transport.createQueueList(type, handler);
}

public async dispose(): Promise<IDisposable> {
Expand Down
6 changes: 6 additions & 0 deletions packages/queue/src/Queue/QueueList.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {Message} from "./interfaces";
import {QueueListAbstract} from "./QueueListAbstract";

export class QueueList<M extends Message> extends QueueListAbstract<M> {

}
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
import {Disposable} from "@bunt/unit";
import {assert, isDefined, isInstanceOf} from "@bunt/util";
import {ITransport} from "../interfaces";
import {
IMessageHandler,
IQueueList,
IQueueListWatcher,
IQueueReader,
IReadOperation,
ISubscription,
ISubscriptionResultHandler,
ITransport,
Message,
MessageCtor,
OperationReleaseState,
} from "./interfaces";
import {TaskAbstract} from "./Message";

export abstract class SubscriptionAbstract<M extends Message> implements ISubscription<M> {
export abstract class QueueListAbstract<M extends Message> implements IQueueList<M> {
readonly #type: MessageCtor<M>;
readonly #reader: IQueueReader<M>;
readonly #transport: ITransport;
readonly #handler: IMessageHandler<M>;
readonly #watchers: ISubscriptionResultHandler<M>[] = [];
readonly #watchers: IQueueListWatcher<M>[] = [];

#subscribed = true;
#state?: Promise<void>;

constructor(transport: ITransport, type: MessageCtor<M>, handler: IMessageHandler<M>) {
this.#type = type;
this.#reader = transport.reader(type);
this.#reader = transport.createQueueReader(type);
this.#handler = handler;
this.#transport = transport;
this.#state = this.listen();
Expand All @@ -43,11 +43,11 @@ export abstract class SubscriptionAbstract<M extends Message> implements ISubscr

public async unsubscribe(): Promise<void> {
this.#subscribed = false;
this.#reader.cancel();
await this.#state;
await this.#reader.cancel()
.finally(() => this.#state);
}

public watch(fn: ISubscriptionResultHandler<M>): () => void {
public watch(fn: IQueueListWatcher<M>): () => void {
this.#watchers.push(fn);
return () => {
this.#watchers.splice(this.#watchers.indexOf(fn), 1);
Expand Down Expand Up @@ -89,6 +89,6 @@ export abstract class SubscriptionAbstract<M extends Message> implements ISubscr

private async fire(operation: Promise<OperationReleaseState<M>>) {
const value = await operation;
this.#watchers.forEach(async (fn) => fn(value));
this.#watchers.forEach((fn) => fn(value));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {tryUnserialize} from "./fn";
import {IQueueReader, IReadOperation, Message, MessageCtor, MessagePayload} from "./interfaces";

export abstract class ReaderAbstract<M extends Message,
export abstract class QueueReaderAbstract<M extends Message,
MC extends MessageCtor<M>,
RO extends IReadOperation<M>> implements IQueueReader<M, RO> {
readonly #type: MC;
Expand Down
6 changes: 0 additions & 6 deletions packages/queue/src/Queue/Subscription.ts

This file was deleted.

10 changes: 5 additions & 5 deletions packages/queue/src/Queue/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
export * from "./fn";
export * from "./interfaces";
export * from "./ReadOperation";
export * from "./ReaderAbstract";
export * from "./SubscriptionAbstract";
export * from "./Subscription";
export * from "./Message";
export * from "./QueueListAbstract";
export * from "./QueueList";
export * from "./QueueAbstract";
export * from "./Queue";
export * from "./Message";
export * from "./QueueReaderAbstract";
export * from "./ReadOperation";
14 changes: 3 additions & 11 deletions packages/queue/src/Queue/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ export interface IReadOperation<M extends Message> {
rollback(reason?: Error): Promise<IReadOperationFail<M>>;
}

export interface ITransport extends IDisposable {
send<M extends Message>(message: M): Promisify<void>;

subscribe<M extends Message>(type: MessageCtor<M>, handler: MessageHandler<M>): ISubscription<M>;

reader<M extends Message>(type: MessageCtor<M>): IQueueReader<M>;
}

export interface IQueueReader<M extends Message, RO extends IReadOperation<M> = IReadOperation<M>>
extends IDisposable {
readonly channel: string;
Expand Down Expand Up @@ -70,18 +62,18 @@ export interface IHandleReleaseFactory<M extends Message> {
(error: Error): IReadOperationFail<M>;
}

export interface ISubscriptionResultHandler<M extends Message> {
export interface IQueueListWatcher<M extends Message> {
(result: OperationReleaseState<M>): unknown;
}

export interface ISubscription<M extends Message> extends IDisposable {
export interface IQueueList<M extends Message> extends IDisposable {
readonly subscribed: boolean;

unsubscribe(): Promise<void>;

subscribe(): Promise<void>;

watch(fn: ISubscriptionResultHandler<M>): void;
watch(fn: IQueueListWatcher<M>): void;
}

export type MessagePayload<M extends Message> = M extends MessageAbstract<infer P> ? P : never;
Expand Down
4 changes: 2 additions & 2 deletions packages/queue/src/Redis/RedisQ2Reader.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {ITransactionType, Message, MessageCtor} from "../Queue";
import {RedisQ2ReadOperation} from "./RedisQ2ReadOperation";
import {RedisReader} from "./RedisReader";
import {RedisQueueReader} from "./RedisQueueReader";
import {RedisTransport} from "./RedisTransport";

export class RedisQ2Reader<M extends Message,
MC extends MessageCtor<M> & ITransactionType> extends RedisReader<M, MC> {
MC extends MessageCtor<M> & ITransactionType> extends RedisQueueReader<M, MC> {
constructor(transport: RedisTransport, type: MC) {
super(transport, type);
}
Expand Down
5 changes: 5 additions & 0 deletions packages/queue/src/Redis/RedisQueueList.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {Message, QueueListAbstract} from "../Queue";

export class RedisQueueList<M extends Message> extends QueueListAbstract<M> {

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import {Redis} from "ioredis";
import {Message, MessageCtor, ReadOperation} from "../Queue";
import {ReaderAbstract} from "../Queue/ReaderAbstract";
import {Message, MessageCtor, QueueReaderAbstract, ReadOperation} from "../Queue";
import {RedisTransport} from "./RedisTransport";

export class RedisReader<M extends Message, MC extends MessageCtor<M>>
extends ReaderAbstract<M, MC, ReadOperation<M>> {
export class RedisQueueReader<M extends Message, MC extends MessageCtor<M>>
extends QueueReaderAbstract<M, MC, ReadOperation<M>> {
protected readonly timeout = 100;
readonly #transport: RedisTransport;
readonly #connection: Redis;
Expand All @@ -28,7 +27,8 @@ export class RedisReader<M extends Message, MC extends MessageCtor<M>>
}

protected next(): Promise<string | undefined> {
return this.wrap(this.#connection.brpop(this.channel, this.timeout).then((message) => message?.[1]));
return this.wrap(this.#connection.brpop(this.channel, this.timeout)
.then((message) => message?.[1]));
}

protected createReadOperation(message: M): ReadOperation<M> {
Expand Down
Loading

0 comments on commit 5dcff6f

Please sign in to comment.