Skip to content

Commit

Permalink
fix: update
Browse files Browse the repository at this point in the history
  • Loading branch information
izatop committed Jul 20, 2023
1 parent 9da3a3c commit 200d526
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 79 deletions.
1 change: 1 addition & 0 deletions packages/async/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"dependencies": {
"@bunt/assert": "^0.29.0",
"@bunt/is": "^0.29.0",
"@bunt/type": "^0.29.0",
"@bunt/util": "^0.29.0"
},
"publishConfig": {
Expand Down
84 changes: 84 additions & 0 deletions packages/async/src/AsyncCallback.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import {Fn, Promisify} from "@bunt/type";
import {isUndefined} from "@bunt/is";

export class AsyncCallback<T> implements AsyncIterable<T> {
readonly #disposables: Fn[] = [];
readonly #pipeline: Fn<[T | undefined]>[] = [];
readonly #queue: T[] = [];

constructor(link: (emit: Fn<[data: T]>) => Promisify<() => void>) {
this.#disposables.push(this.pipe);

Promise.resolve(link(this.push))
.then((dispose) => {
this.#disposables.push(dispose);
});
}

public push = (value: T): void => {
if (this.#pipeline.length) {
return this.pipe(value);
}

this.#queue.push(value);
};

public pull = (): Promise<T | undefined> => {
const value = this.#queue.shift();
if (value) {
return Promise.resolve(value);
}

return new Promise<T | undefined>(this.sync);
};

public [Symbol.asyncIterator](): AsyncIterator<T> {
return {
next: async (): Promise<IteratorResult<T>> => {
return this.pull()
.then(this.asResult);
},
return: async (value?: T | PromiseLike<T>): Promise<IteratorResult<T>> => {
await this.dispose();

return Promise.resolve(value)
.then(this.asResult);
},
throw: async (e?): Promise<IteratorResult<T>> => {
await this.dispose();

return Promise.reject(e);
},
};
}

public getAsyncIterator(): AsyncIterator<T> {
return this[Symbol.asyncIterator]();
}

public async dispose(): Promise<void> {
const pending = this.#disposables.map((fn) => Promise.resolve(fn()));

return Promise
.all(pending)
.then(() => void 0);
}

private asResult = (value?: T): IteratorResult<T> => {
return isUndefined(value) ? {value, done: true} : {value, done: false};
};

private pipe = (value?: T): void => {
this.#pipeline.splice(0, this.#pipeline.length)
.forEach((resolve) => resolve(value));
};

private sync = (resolve: Fn<[T?]>): void => {
const value = this.#queue.shift();
if (value) {
return resolve(value);
}

this.#pipeline.push(resolve);
};
}
1 change: 1 addition & 0 deletions packages/async/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from "./Defer.js";
export * from "./AsyncIteratorFactory.js";
export * from "./AsyncLoader.js";
export * from "./AsyncSingleCall.js";
export * from "./AsyncCallback.js";
1 change: 1 addition & 0 deletions packages/ws/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
},
"dependencies": {
"@bunt/app": "^0.29.2",
"@bunt/async": "^0.29.0",
"@bunt/unit": "^0.29.0",
"@bunt/util": "^0.29.0",
"@bunt/web": "^0.29.3",
Expand Down
4 changes: 3 additions & 1 deletion packages/ws/src/Connection/ClientConnectionAbstract.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import ws from "ws";
import {assert, AsyncCallback, filterValueCallback, resolveOrReject} from "@bunt/util";
import {filterValueCallback, resolveOrReject} from "@bunt/util";
import {assert} from "@bunt/assert";
import {AsyncCallback} from "@bunt/async";
import {IClientConnection} from "./interface.js";

export abstract class ClientConnectionAbstract<T> implements IClientConnection<T> {
Expand Down
2 changes: 1 addition & 1 deletion packages/ws/src/Protocol/GQL/GQLProtoHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {GQLClientPayload} from "./interfaces.js";
export abstract class GQLProtoHandle<C extends Context,
S extends StateType | null = null> extends ProtoHandleAbstract<C, S> {

public static protocol = "graphql-ws";
public static protocol = "graphql-transport-ws";

readonly #connection = new GQLClientConnection(this.getShadowState());

Expand Down
64 changes: 38 additions & 26 deletions packages/ws/src/Protocol/GQL/GQLProtoLayer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {assert, isArray, isFunction, isObject} from "@bunt/util";
import {GQLClientConnection} from "./GQLClientConnection.js";
import {IGQLOperationStop} from "./interfaces.js";
import {GQLOperationType, IGQLOperationComplete} from "./interfaces.js";
import {
GQLClientOperation,
GQLClientOperationType,
Expand All @@ -13,7 +13,10 @@ import {
} from "./index.js";

// @TODO Upgrade type validation to input schema validation
const AllowTypes: string[] = Object.values(GQLClientOperationType);
const AllowTypes: string[] = [
...Object.values(GQLClientOperationType),
...Object.values(GQLOperationType),
];

/**
* @final
Expand All @@ -30,47 +33,58 @@ export class GQLProtoLayer {
this.#initialize = init;
this.#subscribe = factory;

const interval = setInterval(() => this.keepAliveUpdate(), 30000);
const interval = setInterval(() => this.ping(), 30000);
this.#client.on("close", () => this.unsubscribeAll());
this.#client.on("close", () => clearInterval(interval));
}

public async handle(operation: GQLOperationMessage): Promise<void> {
assert(this.isClientOperation(operation), "Wrong the Operation Message");
switch (operation.type) {
case GQLClientOperationType.CONNECTION_INIT:
case GQLClientOperationType.ConnectionInit:
Object.assign(this.#params, operation.payload);
await this.#client.send({type: GQLServerOperationType.CONNECTION_ACK});
await this.keepAliveUpdate();
await Promise.resolve(this.#initialize(this.#params));
await this.#client.send({type: GQLServerOperationType.ConnectionAck});
break;
case GQLClientOperationType.CONNECTION_TERMINATE:
this.terminate();

case GQLOperationType.Pong:
break;

case GQLOperationType.Ping:
await this.#client.send({type: GQLOperationType.Pong});
break;

case GQLOperationType.Complete:
this.unsubscribe(operation);
break;
case GQLClientOperationType.START:

case GQLClientOperationType.Subscribe:
this.createSubscription(operation.id, operation.payload)
// eslint-disable-next-line
.catch(console.error);
break;
case GQLClientOperationType.STOP:
this.stopOperation(operation);
}
}

private stopOperation(operation: IGQLOperationStop): void {
private unsubscribe(operation: IGQLOperationComplete): void {
const subscription = this.#subscriptions.get(operation.id);
if (subscription) {
this.#subscriptions.delete(operation.id);
subscription.return?.();
}
}

private keepAliveUpdate(): Promise<void> {
return this.#client.send({type: GQLServerOperationType.CONNECTION_KEEP_ALIVE});
private ping(): Promise<void> {
return this.#client.send({type: GQLOperationType.Ping});
}

private pong(): Promise<void> {
return this.#client.send({type: GQLOperationType.Pong});
}

private unsubscribeAll(): void {
for (const subscription of this.#subscriptions.values()) {
for (const [id, subscription] of this.#subscriptions.entries()) {
this.#subscriptions.delete(id);
subscription.return?.();
}
}
Expand All @@ -82,14 +96,19 @@ export class GQLProtoLayer {

this.#subscriptions.set(id, subscription);
for await (const next of subscription) {
await this.#client.send({id, type: GQLServerOperationType.DATA, payload: next});
await this.#client.send({id, type: GQLServerOperationType.Next, payload: next});
}

await this.#client.send({id, type: GQLServerOperationType.COMPLETE});
await this.#client.send({id, type: GQLOperationType.Complete});
} catch (error) {
if (this.#client.ready) {
await this.#client.send({id, type: GQLServerOperationType.ERROR, payload: this.serializeError(error)});
await this.#client.send({id, type: GQLServerOperationType.COMPLETE});
await this.#client.send({
id,
type: GQLServerOperationType.Error,
payload: [this.serializeError(error)],
});

await this.#client.send({id, type: GQLOperationType.Complete});
}
}
}
Expand All @@ -113,11 +132,4 @@ export class GQLProtoLayer {

return {message: "Unknown error", code: 500};
}

private terminate(): void {
for (const [id, subscription] of this.#subscriptions.entries()) {
this.#subscriptions.delete(id);
subscription.return?.();
}
}
}
Loading

0 comments on commit 200d526

Please sign in to comment.