From f153190800bfa2648e0256f050a035798586054e Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 31 Oct 2022 15:57:02 +0200 Subject: [PATCH] stream: add compose operator PR-URL: https://github.com/nodejs/node/pull/44937 Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum Reviewed-By: Robert Nagy --- doc/api/stream.md | 39 ++++++ lib/internal/streams/operators.js | 32 +++++ test/parallel/test-stream-compose-operator.js | 127 ++++++++++++++++++ test/parallel/test-stream-compose.js | 27 ++-- 4 files changed, 210 insertions(+), 15 deletions(-) create mode 100644 test/parallel/test-stream-compose-operator.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 5b65340f494685..ea0f5740737fb6 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1681,6 +1681,41 @@ option. In the code example above, data will be in a single chunk if the file has less then 64 KiB of data because no `highWaterMark` option is provided to [`fs.createReadStream()`][]. +##### `readable.compose(stream[, options])` + + + +> Stability: 1 - Experimental + +* `stream` {Stream|Iterable|AsyncIterable|Function} +* `options` {Object} + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Duplex} a stream composed with the stream `stream`. + +```mjs +import { Readable } from 'node:stream'; + +async function* splitToWords(source) { + for await (const chunk of source) { + const words = String(chunk).split(' '); + + for (const word of words) { + yield word; + } + } +} + +const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords); +const words = await wordsStream.toArray(); + +console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator'] +``` + +See [`stream.compose`][] for more information. + ##### `readable.iterator([options])`