diff --git a/README.md b/README.md index acb4c7b..80d1b3f 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,27 @@ stream.on('eventName', function (a, b, c, n, err) { }) ``` +### Post Messages + +You can post messages to the worker by emitting a `message` event on the ThreadStream. + +```js +const stream = new ThreadStream({ + filename: join(__dirname, 'worker.js'), + workerData: {}, +}) +stream.emit('message', message) +``` + +On your worker, you can listen for this message using [`worker.parentPort.on('message', cb)`](https://nodejs.org/api/worker_threads.html#event-message). + +```js +const { parentPort } = require('worker_threads') +parentPort.on('message', function (message) { + console.log('received:', message) +}) +``` + ## License MIT diff --git a/index.d.ts b/index.d.ts index 40f5d86..72e48ca 100644 --- a/index.d.ts +++ b/index.d.ts @@ -68,6 +68,26 @@ declare class ThreadStream extends EventEmitter { * @throws {Error} if the stream is already flushing, if it fails to flush or if it takes more than 10 seconds to flush. */ flushSync(): void + /** + * Synchronously calls each of the listeners registered for the event named`eventName`, in the order they were registered, passing the supplied arguments + * to each. + * + * @param eventName the name of the event. + * @param args the arguments to be passed to the event handlers. + * @returns {boolean} `true` if the event had listeners, `false` otherwise. + */ + emit(eventName: string | symbol, ...args: any[]): boolean { + return super.emit(eventName, ...args); + } + /** + * Post a message to the Worker with specified data and an optional list of transferable objects. + * + * @param eventName the name of the event, specifically 'message'. + * @param message message data to be sent to the Worker. + * @param transferList an optional list of transferable objects to be transferred to the Worker context. + * @returns {boolean} true if the event had listeners, false otherwise. + */ + emit(eventName: 'message', message: any, transferList?: Transferable[]): boolean } export = ThreadStream; diff --git a/index.js b/index.js index db65716..bf8b387 100644 --- a/index.js +++ b/index.js @@ -228,6 +228,9 @@ class ThreadStream extends EventEmitter { // TODO (fix): Make private? this.worker = createWorker(this, opts) // TODO (fix): make private + this.on('message', (message, transferList) => { + this.worker.postMessage(message, transferList) + }) } write (data) { diff --git a/test/on-message.js b/test/on-message.js new file mode 100644 index 0000000..4aaf09e --- /dev/null +++ b/test/on-message.js @@ -0,0 +1,18 @@ +'use strict' + +const { parentPort } = require('worker_threads') +const { Writable } = require('stream') + +function run () { + parentPort.once('message', function ({ text, takeThisPortPlease }) { + takeThisPortPlease.postMessage(`received: ${text}`) + }) + return new Writable({ + autoDestroy: true, + write (chunk, enc, cb) { + cb() + } + }) +} + +module.exports = run diff --git a/test/post-message.test.js b/test/post-message.test.js new file mode 100644 index 0000000..a266ca7 --- /dev/null +++ b/test/post-message.test.js @@ -0,0 +1,24 @@ +'use strict' + +const { test } = require('tap') +const { join } = require('path') +const { once } = require('events') +const { MessageChannel } = require('worker_threads') +const ThreadStream = require('..') + +test('message events emitted on the stream are posted to the worker', async function (t) { + t.plan(1) + + const { port1, port2 } = new MessageChannel() + const stream = new ThreadStream({ + filename: join(__dirname, 'on-message.js'), + sync: false + }) + t.teardown(() => { + stream.end() + }) + + stream.emit('message', { text: 'hello', takeThisPortPlease: port1 }, [port1]) + const [confirmation] = await once(port2, 'message') + t.equal(confirmation, 'received: hello') +})