From b70535ee7ecedfe3759dc378ade4887e57994081 Mon Sep 17 00:00:00 2001 From: Jamie King Date: Tue, 19 Mar 2024 18:21:09 -0700 Subject: [PATCH 1/2] feat: post message to worker when message event is emitted --- README.md | 21 +++++++++++++++++++++ index.d.ts | 20 ++++++++++++++++++++ index.js | 3 +++ test/on-message.js | 18 ++++++++++++++++++ test/post-message.test.js | 23 +++++++++++++++++++++++ 5 files changed, 85 insertions(+) create mode 100644 test/on-message.js create mode 100644 test/post-message.test.js diff --git a/README.md b/README.md index acb4c7b..80d1b3f 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,27 @@ stream.on('eventName', function (a, b, c, n, err) { }) ``` +### Post Messages + +You can post messages to the worker by emitting a `message` event on the ThreadStream. + +```js +const stream = new ThreadStream({ + filename: join(__dirname, 'worker.js'), + workerData: {}, +}) +stream.emit('message', message) +``` + +On your worker, you can listen for this message using [`worker.parentPort.on('message', cb)`](https://nodejs.org/api/worker_threads.html#event-message). + +```js +const { parentPort } = require('worker_threads') +parentPort.on('message', function (message) { + console.log('received:', message) +}) +``` + ## License MIT diff --git a/index.d.ts b/index.d.ts index 40f5d86..72e48ca 100644 --- a/index.d.ts +++ b/index.d.ts @@ -68,6 +68,26 @@ declare class ThreadStream extends EventEmitter { * @throws {Error} if the stream is already flushing, if it fails to flush or if it takes more than 10 seconds to flush. */ flushSync(): void + /** + * Synchronously calls each of the listeners registered for the event named`eventName`, in the order they were registered, passing the supplied arguments + * to each. + * + * @param eventName the name of the event. + * @param args the arguments to be passed to the event handlers. + * @returns {boolean} `true` if the event had listeners, `false` otherwise. + */ + emit(eventName: string | symbol, ...args: any[]): boolean { + return super.emit(eventName, ...args); + } + /** + * Post a message to the Worker with specified data and an optional list of transferable objects. + * + * @param eventName the name of the event, specifically 'message'. + * @param message message data to be sent to the Worker. + * @param transferList an optional list of transferable objects to be transferred to the Worker context. + * @returns {boolean} true if the event had listeners, false otherwise. + */ + emit(eventName: 'message', message: any, transferList?: Transferable[]): boolean } export = ThreadStream; diff --git a/index.js b/index.js index db65716..bf8b387 100644 --- a/index.js +++ b/index.js @@ -228,6 +228,9 @@ class ThreadStream extends EventEmitter { // TODO (fix): Make private? this.worker = createWorker(this, opts) // TODO (fix): make private + this.on('message', (message, transferList) => { + this.worker.postMessage(message, transferList) + }) } write (data) { diff --git a/test/on-message.js b/test/on-message.js new file mode 100644 index 0000000..4aaf09e --- /dev/null +++ b/test/on-message.js @@ -0,0 +1,18 @@ +'use strict' + +const { parentPort } = require('worker_threads') +const { Writable } = require('stream') + +function run () { + parentPort.once('message', function ({ text, takeThisPortPlease }) { + takeThisPortPlease.postMessage(`received: ${text}`) + }) + return new Writable({ + autoDestroy: true, + write (chunk, enc, cb) { + cb() + } + }) +} + +module.exports = run diff --git a/test/post-message.test.js b/test/post-message.test.js new file mode 100644 index 0000000..9b5d08d --- /dev/null +++ b/test/post-message.test.js @@ -0,0 +1,23 @@ +'use strict' + +const { test } = require('tap') +const { join } = require('path') +const { once } = require('events') +const ThreadStream = require('..') + +test('message events emitted on the stream are posted to the worker', async function (t) { + t.plan(1) + + const { port1, port2 } = new MessageChannel() + const stream = new ThreadStream({ + filename: join(__dirname, 'on-message.js'), + sync: false + }) + t.teardown(() => { + stream.end() + }) + + stream.emit('message', { text: 'hello', takeThisPortPlease: port1 }, [port1]) + const [confirmation] = await once(port2, 'message') + t.equal(confirmation, 'received: hello') +}) From 4c41be7bc5e1cf90ef6159ac371f1ec520350781 Mon Sep 17 00:00:00 2001 From: Jamie King Date: Fri, 22 Mar 2024 15:15:36 -0700 Subject: [PATCH 2/2] test: added missing MessageChannel import --- test/post-message.test.js | 1 + 1 file changed, 1 insertion(+) diff --git a/test/post-message.test.js b/test/post-message.test.js index 9b5d08d..a266ca7 100644 --- a/test/post-message.test.js +++ b/test/post-message.test.js @@ -3,6 +3,7 @@ const { test } = require('tap') const { join } = require('path') const { once } = require('events') +const { MessageChannel } = require('worker_threads') const ThreadStream = require('..') test('message events emitted on the stream are posted to the worker', async function (t) {