Skip to content

Commit

Permalink
fix: pubsub subkeys added
Browse files Browse the repository at this point in the history
  • Loading branch information
izatop committed Jan 26, 2021
1 parent fcf2202 commit ed829f2
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 55 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
"@commitlint/config-conventional": "^11.0.0",
"@types/jest": "^26.0.20",
"@types/node": "^14.14.22",
"@typescript-eslint/eslint-plugin": "^4.14.0",
"@typescript-eslint/parser": "^4.14.0",
"@typescript-eslint/eslint-plugin": "^4.14.1",
"@typescript-eslint/parser": "^4.14.1",
"cross-env": "^7.0.3",
"eslint": "^7.18.0",
"husky": "^4.3.8",
Expand Down
27 changes: 16 additions & 11 deletions packages/queue/src/PubSub/PubSubAbstract.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Disposable, IDisposable} from "@bunt/unit";
import {AsyncCallback, Fn, Promisify} from "@bunt/util";
import {IPubSubTransport, ISubscriber} from "./interfaces";
import {AsyncCallback, Fn, isArray, Promisify} from "@bunt/util";
import {IPubSubTransport, ISubscriber, PubSubChannel} from "./interfaces";

export abstract class PubSubAbstract<S extends Record<string, any>, T extends IPubSubTransport>
implements IDisposable {
Expand All @@ -12,19 +12,20 @@ export abstract class PubSubAbstract<S extends Record<string, any>, T extends IP
this.#transport = transport;
}

public async publish<K extends keyof S>(channel: Extract<K, string>, message: S[K]): Promise<void> {
await this.#transport.publish(channel, this.serialize(message));
public async publish<K extends keyof S>(channel: PubSubChannel<K>, message: S[K]): Promise<void> {
await this.#transport.publish(this.key(channel), this.serialize(message));
}

public async subscribe<K extends keyof S>(channel: Extract<K, string>): Promise<AsyncIterable<S[K]>>;
public async subscribe<K extends keyof S>(channel: Extract<K, string>,
public async subscribe<K extends keyof S>(channel: PubSubChannel<string>): Promise<AsyncIterable<S[K]>>;
public async subscribe<K extends keyof S>(channel: PubSubChannel<string>,
listener: Fn<[S[K]], unknown>): Promise<() => void>;
public async subscribe<K extends keyof S>(
channel: Extract<K, string>, listener?: Fn<[S[K]], unknown>): Promise<AsyncIterable<S[K]> | (() => void)> {
const subscription = this.#subscriptions.get(channel) ?? await this.#transport.subscribe(channel);
if (!this.#subscriptions.has(channel)) {
channel: PubSubChannel<string>, listener?: Fn<[S[K]], unknown>): Promise<AsyncIterable<S[K]> | (() => void)> {
const key = this.key(channel);
const subscription = this.#subscriptions.get(key) ?? await this.#transport.subscribe(key);
if (!this.#subscriptions.has(key)) {
await subscription.subscribe();
this.#subscriptions.set(channel, subscription);
this.#subscriptions.set(key, subscription);
}

if (listener) {
Expand All @@ -41,7 +42,7 @@ export abstract class PubSubAbstract<S extends Record<string, any>, T extends IP

public async asyncIterator<K extends keyof S>(channel: Extract<K, string>): Promise<AsyncIterator<S[K]>> {
const subscription = await this.subscribe(channel);
return subscription[Symbol.asyncIterator]();
return subscription[Symbol.asyncIterator]() as AsyncIterator<S[K]>;
}

public dispose(): Promisify<Disposable | Disposable[] | void> {
Expand All @@ -52,6 +53,10 @@ export abstract class PubSubAbstract<S extends Record<string, any>, T extends IP
return this.#transport.dispose();
}

protected key(channel: PubSubChannel<string>): string {
return isArray(channel) ? channel.join("/") : channel;
}

protected abstract serialize<K extends keyof S>(message: S[K]): string;

protected abstract parse<K extends keyof S>(message: string): S[K];
Expand Down
4 changes: 4 additions & 0 deletions packages/queue/src/PubSub/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import {Fn} from "@bunt/util";
import {ITransport} from "../interfaces";

export type PubSubChannel<K extends string | symbol | number = string> = Extract<K, string> |
[channel: Extract<K, string>, ...channelSubKeys: (string | number)[]];

export interface ISubscriber {
readonly channel: string;

unsubscribe(): Promise<void>;

subscribe(): Promise<void>;

listen(listener: Fn<[string], unknown>): Fn;
Expand Down
3 changes: 2 additions & 1 deletion packages/queue/test/src/PubSub.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import {Disposer} from "@bunt/unit";
import {RedisTransport} from "../../src";
import {PubSubChannel} from "../../src/PubSub/interfaces";
import {PubSubSimple} from "../../src/PubSub/PubSubSimple";

describe.skip("PubSub", () => {
test("Main", async () => {
const channel = "foo";
const channel: PubSubChannel<any> = ["foo", 1];
const transport = new RedisTransport("redis://127.0.0.1:6379");
const pubSub = new PubSubSimple(transport);
const subscribe = await pubSub.subscribe(channel);
Expand Down
82 changes: 41 additions & 41 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1706,75 +1706,75 @@
dependencies:
"@types/yargs-parser" "*"

"@typescript-eslint/eslint-plugin@^4.14.0":
version "4.14.0"
resolved "https://registry.yarnpkg.com/@typescript-eslint/eslint-plugin/-/eslint-plugin-4.14.0.tgz#92db8e7c357ed7d69632d6843ca70b71be3a721d"
integrity sha512-IJ5e2W7uFNfg4qh9eHkHRUCbgZ8VKtGwD07kannJvM5t/GU8P8+24NX8gi3Hf5jST5oWPY8kyV1s/WtfiZ4+Ww==
"@typescript-eslint/eslint-plugin@^4.14.1":
version "4.14.1"
resolved "https://registry.yarnpkg.com/@typescript-eslint/eslint-plugin/-/eslint-plugin-4.14.1.tgz#22dd301ce228aaab3416b14ead10b1db3e7d3180"
integrity sha512-5JriGbYhtqMS1kRcZTQxndz1lKMwwEXKbwZbkUZNnp6MJX0+OVXnG0kOlBZP4LUAxEyzu3cs+EXd/97MJXsGfw==
dependencies:
"@typescript-eslint/experimental-utils" "4.14.0"
"@typescript-eslint/scope-manager" "4.14.0"
"@typescript-eslint/experimental-utils" "4.14.1"
"@typescript-eslint/scope-manager" "4.14.1"
debug "^4.1.1"
functional-red-black-tree "^1.0.1"
lodash "^4.17.15"
regexpp "^3.0.0"
semver "^7.3.2"
tsutils "^3.17.1"

"@typescript-eslint/[email protected].0":
version "4.14.0"
resolved "https://registry.yarnpkg.com/@typescript-eslint/experimental-utils/-/experimental-utils-4.14.0.tgz#5aa7b006736634f588a69ee343ca959cd09988df"
integrity sha512-6i6eAoiPlXMKRbXzvoQD5Yn9L7k9ezzGRvzC/x1V3650rUk3c3AOjQyGYyF9BDxQQDK2ElmKOZRD0CbtdkMzQQ==
"@typescript-eslint/[email protected].1":
version "4.14.1"
resolved "https://registry.yarnpkg.com/@typescript-eslint/experimental-utils/-/experimental-utils-4.14.1.tgz#a5c945cb24dabb96747180e1cfc8487f8066f471"
integrity sha512-2CuHWOJwvpw0LofbyG5gvYjEyoJeSvVH2PnfUQSn0KQr4v8Dql2pr43ohmx4fdPQ/eVoTSFjTi/bsGEXl/zUUQ==
dependencies:
"@types/json-schema" "^7.0.3"
"@typescript-eslint/scope-manager" "4.14.0"
"@typescript-eslint/types" "4.14.0"
"@typescript-eslint/typescript-estree" "4.14.0"
"@typescript-eslint/scope-manager" "4.14.1"
"@typescript-eslint/types" "4.14.1"
"@typescript-eslint/typescript-estree" "4.14.1"
eslint-scope "^5.0.0"
eslint-utils "^2.0.0"

"@typescript-eslint/parser@^4.14.0":
version "4.14.0"
resolved "https://registry.yarnpkg.com/@typescript-eslint/parser/-/parser-4.14.0.tgz#62d4cd2079d5c06683e9bfb200c758f292c4dee7"
integrity sha512-sUDeuCjBU+ZF3Lzw0hphTyScmDDJ5QVkyE21pRoBo8iDl7WBtVFS+WDN3blY1CH3SBt7EmYCw6wfmJjF0l/uYg==
"@typescript-eslint/parser@^4.14.1":
version "4.14.1"
resolved "https://registry.yarnpkg.com/@typescript-eslint/parser/-/parser-4.14.1.tgz#3bd6c24710cd557d8446625284bcc9c6d52817c6"
integrity sha512-mL3+gU18g9JPsHZuKMZ8Z0Ss9YP1S5xYZ7n68Z98GnPq02pYNQuRXL85b9GYhl6jpdvUc45Km7hAl71vybjUmw==
dependencies:
"@typescript-eslint/scope-manager" "4.14.0"
"@typescript-eslint/types" "4.14.0"
"@typescript-eslint/typescript-estree" "4.14.0"
"@typescript-eslint/scope-manager" "4.14.1"
"@typescript-eslint/types" "4.14.1"
"@typescript-eslint/typescript-estree" "4.14.1"
debug "^4.1.1"

"@typescript-eslint/[email protected].0":
version "4.14.0"
resolved "https://registry.yarnpkg.com/@typescript-eslint/scope-manager/-/scope-manager-4.14.0.tgz#55a4743095d684e1f7b7180c4bac2a0a3727f517"
integrity sha512-/J+LlRMdbPh4RdL4hfP1eCwHN5bAhFAGOTsvE6SxsrM/47XQiPSgF5MDgLyp/i9kbZV9Lx80DW0OpPkzL+uf8Q==
"@typescript-eslint/[email protected].1":
version "4.14.1"
resolved "https://registry.yarnpkg.com/@typescript-eslint/scope-manager/-/scope-manager-4.14.1.tgz#8444534254c6f370e9aa974f035ced7fe713ce02"
integrity sha512-F4bjJcSqXqHnC9JGUlnqSa3fC2YH5zTtmACS1Hk+WX/nFB0guuynVK5ev35D4XZbdKjulXBAQMyRr216kmxghw==
dependencies:
"@typescript-eslint/types" "4.14.0"
"@typescript-eslint/visitor-keys" "4.14.0"
"@typescript-eslint/types" "4.14.1"
"@typescript-eslint/visitor-keys" "4.14.1"

"@typescript-eslint/[email protected].0":
version "4.14.0"
resolved "https://registry.yarnpkg.com/@typescript-eslint/types/-/types-4.14.0.tgz#d8a8202d9b58831d6fd9cee2ba12f8a5a5dd44b6"
integrity sha512-VsQE4VvpldHrTFuVPY1ZnHn/Txw6cZGjL48e+iBxTi2ksa9DmebKjAeFmTVAYoSkTk7gjA7UqJ7pIsyifTsI4A==
"@typescript-eslint/[email protected].1":
version "4.14.1"
resolved "https://registry.yarnpkg.com/@typescript-eslint/types/-/types-4.14.1.tgz#b3d2eb91dafd0fd8b3fce7c61512ac66bd0364aa"
integrity sha512-SkhzHdI/AllAgQSxXM89XwS1Tkic7csPdndUuTKabEwRcEfR8uQ/iPA3Dgio1rqsV3jtqZhY0QQni8rLswJM2w==

"@typescript-eslint/[email protected].0":
version "4.14.0"
resolved "https://registry.yarnpkg.com/@typescript-eslint/typescript-estree/-/typescript-estree-4.14.0.tgz#4bcd67486e9acafc3d0c982b23a9ab8ac8911ed7"
integrity sha512-wRjZ5qLao+bvS2F7pX4qi2oLcOONIB+ru8RGBieDptq/SudYwshveORwCVU4/yMAd4GK7Fsf8Uq1tjV838erag==
"@typescript-eslint/[email protected].1":
version "4.14.1"
resolved "https://registry.yarnpkg.com/@typescript-eslint/typescript-estree/-/typescript-estree-4.14.1.tgz#20d3b8c8e3cdc8f764bdd5e5b0606dd83da6075b"
integrity sha512-M8+7MbzKC1PvJIA8kR2sSBnex8bsR5auatLCnVlNTJczmJgqRn8M+sAlQfkEq7M4IY3WmaNJ+LJjPVRrREVSHQ==
dependencies:
"@typescript-eslint/types" "4.14.0"
"@typescript-eslint/visitor-keys" "4.14.0"
"@typescript-eslint/types" "4.14.1"
"@typescript-eslint/visitor-keys" "4.14.1"
debug "^4.1.1"
globby "^11.0.1"
is-glob "^4.0.1"
lodash "^4.17.15"
semver "^7.3.2"
tsutils "^3.17.1"

"@typescript-eslint/[email protected].0":
version "4.14.0"
resolved "https://registry.yarnpkg.com/@typescript-eslint/visitor-keys/-/visitor-keys-4.14.0.tgz#b1090d9d2955b044b2ea2904a22496849acbdf54"
integrity sha512-MeHHzUyRI50DuiPgV9+LxcM52FCJFYjJiWHtXlbyC27b80mfOwKeiKI+MHOTEpcpfmoPFm/vvQS88bYIx6PZTA==
"@typescript-eslint/[email protected].1":
version "4.14.1"
resolved "https://registry.yarnpkg.com/@typescript-eslint/visitor-keys/-/visitor-keys-4.14.1.tgz#e93c2ff27f47ee477a929b970ca89d60a117da91"
integrity sha512-TAblbDXOI7bd0C/9PE1G+AFo7R5uc+ty1ArDoxmrC1ah61Hn6shURKy7gLdRb1qKJmjHkqu5Oq+e4Kt0jwf1IA==
dependencies:
"@typescript-eslint/types" "4.14.0"
"@typescript-eslint/types" "4.14.1"
eslint-visitor-keys "^2.0.0"

"@zkochan/cmd-shim@^3.1.0":
Expand Down

0 comments on commit ed829f2

Please sign in to comment.