Skip to content

Commit

Permalink
fixup! feat(node): worker_threads
Browse files Browse the repository at this point in the history
  • Loading branch information
Mesteery committed Aug 26, 2021
1 parent cce9484 commit 0612616
Show file tree
Hide file tree
Showing 4 changed files with 340 additions and 117 deletions.
84 changes: 43 additions & 41 deletions node/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ export interface WrappedFunction extends Function {
listener: GenericFunction;
}

export interface OnceableEventEmitter {
once(event: string | symbol, listener: GenericFunction): unknown;
removeListener(
event: string | symbol,
listener: GenericFunction,
): unknown;
}

export interface OnableEventEmitter {
on(event: string | symbol, listener: GenericFunction): unknown;
removeListener(
event: string | symbol,
listener: GenericFunction,
): unknown;
}

function ensureArray<T>(maybeArray: T[] | T): T[] {
return Array.isArray(maybeArray) ? maybeArray : [maybeArray];
}
Expand Down Expand Up @@ -175,8 +191,8 @@ export class EventEmitter {
* Returns an array listing the events for which the emitter has
* registered listeners.
*/
public eventNames(): [string | symbol] {
return Reflect.ownKeys(this._events) as [string | symbol];
public eventNames(): (string | symbol)[] {
return Reflect.ownKeys(this._events);
}

/**
Expand Down Expand Up @@ -483,51 +499,37 @@ export class EventEmitter {
* will resolve with an array of all the arguments emitted to the given event.
*/
public static once(
emitter: EventEmitter | EventTarget,
name: string,
emitter: OnceableEventEmitter,
name: string | symbol,
// deno-lint-ignore no-explicit-any
): Promise<any[]> {
return new Promise((resolve, reject) => {
if (emitter instanceof EventTarget) {
// EventTarget does not have `error` event semantics like Node
// EventEmitters, we do not listen to `error` events here.
emitter.addEventListener(
name,
(...args) => {
resolve(args);
},
{ once: true, passive: false, capture: false },
);
return;
} else if (emitter instanceof EventEmitter) {
// deno-lint-ignore no-explicit-any
const eventListener = (...args: any[]): void => {
if (errorListener !== undefined) {
emitter.removeListener("error", errorListener);
}
resolve(args);
};
let errorListener: GenericFunction;

// Adding an error listener is not optional because
// if an error is thrown on an event emitter we cannot
// guarantee that the actual event we are waiting will
// be fired. The result could be a silent way to create
// memory or file descriptor leaks, which is something
// we should avoid.
if (name !== "error") {
// deno-lint-ignore no-explicit-any
const eventListener = (...args: any[]): void => {
if (errorListener !== undefined) {
emitter.removeListener("error", errorListener);
}
resolve(args);
errorListener = (err: any): void => {
emitter.removeListener(name, eventListener);
reject(err);
};
let errorListener: GenericFunction;

// Adding an error listener is not optional because
// if an error is thrown on an event emitter we cannot
// guarantee that the actual event we are waiting will
// be fired. The result could be a silent way to create
// memory or file descriptor leaks, which is something
// we should avoid.
if (name !== "error") {
// deno-lint-ignore no-explicit-any
errorListener = (err: any): void => {
emitter.removeListener(name, eventListener);
reject(err);
};

emitter.once("error", errorListener);
}

emitter.once(name, eventListener);
return;
emitter.once("error", errorListener);
}

emitter.once(name, eventListener);
});
}

Expand All @@ -538,7 +540,7 @@ export class EventEmitter {
* emitted event arguments.
*/
public static on(
emitter: EventEmitter,
emitter: OnableEventEmitter,
event: string | symbol,
): AsyncIterable {
// deno-lint-ignore no-explicit-any
Expand Down
29 changes: 29 additions & 0 deletions node/testdata/worker_threads.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import {
getEnvironmentData,
isMainThread,
parentPort,
threadId,
workerData,
} from "../worker_threads.ts";
import { once } from "../events.ts";

async function message(expectedMessage: string) {
const [message] = await once(parentPort, "message");
if (message !== expectedMessage) {
// fail test
parentPort.close();
}
}

await message("Hello, how are you my thread?");
parentPort.postMessage("I'm fine!");

parentPort.postMessage({
isMainThread,
threadId,
workerData: Array.isArray(workerData) &&
workerData[workerData.length - 1] instanceof MessagePort
? workerData.slice(0, -1)
: workerData,
envData: [getEnvironmentData("test"), getEnvironmentData(1)],
});
Loading

0 comments on commit 0612616

Please sign in to comment.