Skip to content

Commit

Permalink
Remove p-event dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
novemberborn committed Jul 3, 2023
1 parent 7533020 commit f047694
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 72 deletions.
2 changes: 1 addition & 1 deletion lib/plugin-support/shared-workers.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export async function observeWorkerProcess(fork, runStatus) {
try {
await launched.statePromises.available;

port.postMessage({type: 'ready'});
port.postMessage({ava: {type: 'ready'}});

launched.worker.postMessage({
type: 'register-test-worker',
Expand Down
48 changes: 10 additions & 38 deletions lib/worker/channel.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,14 @@ const timers = require('../now-and-timers.cjs');

const {isRunningInChildProcess, isRunningInThread} = require('./utils.cjs');

let pEvent = async (emitter, event, options) => {
// We need to import p-event, but import() is asynchronous. Buffer any events
// emitted in the meantime. Don't handle errors.
const buffer = [];
const addToBuffer = (...args) => buffer.push(args);
emitter.on(event, addToBuffer);

try {
({pEvent} = await import('p-event'));
} finally {
emitter.off(event, addToBuffer);
}

if (buffer.length === 0) {
return pEvent(emitter, event, options);
}

// Now replay buffered events.
const replayEmitter = new events.EventEmitter();
const promise = pEvent(replayEmitter, event, options);
for (const args of buffer) {
replayEmitter.emit(event, ...args);
}

const replay = (...args) => replayEmitter.emit(event, ...args);
emitter.on(event, replay);

try {
return await promise;
} finally {
emitter.off(event, replay);
const selectAvaMessage = async (channel, type) => {
for await (const [message] of events.on(channel, 'message')) {
if (message.ava?.type === type) {
return message;
}
}
};

const selectAvaMessage = type => message => message.ava && message.ava.type === type;

class RefCounter {
constructor() {
this.count = 0;
Expand Down Expand Up @@ -133,8 +105,8 @@ if (isRunningInChildProcess) {
// Node.js. In order to keep track, explicitly reference before attaching.
handle.ref();

exports.options = pEvent(handle.channel, 'message', selectAvaMessage('options')).then(message => message.ava.options); // eslint-disable-line unicorn/prefer-top-level-await
exports.peerFailed = pEvent(handle.channel, 'message', selectAvaMessage('peer-failed'));
exports.options = selectAvaMessage(handle.channel, 'options').then(message => message.ava.options); // eslint-disable-line unicorn/prefer-top-level-await
exports.peerFailed = selectAvaMessage(handle.channel, 'peer-failed'); // eslint-disable-line unicorn/prefer-top-level-await
exports.send = handle.send.bind(handle);
exports.unref = handle.unref.bind(handle);

Expand All @@ -143,7 +115,7 @@ async function flush() {
handle.ref();
const promise = pendingPings.then(async () => {
handle.send({type: 'ping'});
await pEvent(handle.channel, 'message', selectAvaMessage('pong'));
await selectAvaMessage(handle.channel, 'pong');
if (promise === pendingPings) {
handle.unref();
}
Expand Down Expand Up @@ -202,7 +174,7 @@ function registerSharedWorker(filename, initialData) {
// The attaching of message listeners will cause the port to be referenced by
// Node.js. In order to keep track, explicitly reference before attaching.
sharedWorkerHandle.ref();
const ready = pEvent(ourPort, 'message', ({type}) => type === 'ready').then(() => {
const ready = selectAvaMessage(ourPort, 'ready').then(() => {
currentlyAvailable = error === null;
}).finally(() => {
// Once ready, it's up to user code to subscribe to messages, which (see
Expand All @@ -214,7 +186,7 @@ function registerSharedWorker(filename, initialData) {

// Errors are received over the test worker channel, not the message port
// dedicated to the shared worker.
pEvent(channelEmitter, 'shared-worker-error').then(() => {
events.once(channelEmitter, 'shared-worker-error').then(() => {
unsubscribe();
sharedWorkerHandle.forceUnref();
error = new Error('The shared worker is no longer available');
Expand Down
26 changes: 0 additions & 26 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@
"matcher": "^5.0.0",
"mem": "^9.0.2",
"ms": "^2.1.3",
"p-event": "^6.0.0",
"p-map": "^6.0.0",
"picomatch": "^2.3.1",
"pkg-conf": "^4.0.0",
Expand Down
11 changes: 5 additions & 6 deletions test-tap/fixture/fail-fast/multiple-files/passes-slow.cjs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const events = require('node:events');
const {parentPort} = require('node:worker_threads');

const test = require('../../../../entrypoints/main.cjs');
Expand All @@ -6,14 +7,12 @@ test.serial('first pass', async t => {
t.pass();
const timer = setTimeout(() => {}, 60_000); // Ensure process stays alive.
const source = parentPort || process;
const {pEvent} = await import('p-event');
await pEvent(source, 'message', message => {
if (message.ava) {
return message.ava.type === 'peer-failed';
for await (const [message] of events.on(source, 'message')) {
if (message.ava?.type === 'peer-failed') {
break;
}
}

return false;
});
clearTimeout(timer);
});

Expand Down

0 comments on commit f047694

Please sign in to comment.