From 4918a155dd2d2fa4f3bf87e5bf98ea7deabddaf6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 18 Jun 2021 08:08:50 +0200 Subject: [PATCH 1/8] stream: add stream.compose Refs: #32020 --- doc/api/stream.md | 87 +++++ lib/internal/streams/compose.js | 288 ++++++++++++++++ lib/internal/streams/pipeline.js | 3 - lib/stream.js | 2 + test/parallel/test-bootstrap-modules.js | 1 + test/parallel/test-stream-compose.js | 420 ++++++++++++++++++++++++ 6 files changed, 798 insertions(+), 3 deletions(-) create mode 100644 lib/internal/streams/compose.js create mode 100644 test/parallel/test-stream-compose.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 603e0407a8c60c..843f59c51d17fe 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1859,6 +1859,93 @@ run().catch(console.error); after the `callback` has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors. +### `stream.compose(...streams)` + + +* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]} +* Returns: {stream.Duplex} + +Combines two or more streams into a `Duplex` stream that writes to the +first stream and reads from the last. Each provided stream is piped into +the next, using `stream.pipeline`. If any of the streams error then all +are destroyed, including the outer `Duplex` stream. + +Because `stream.compose` returns a new stream that in turn can (and +should) be piped into other streams, it enables composition. In contrast, +when passing streams to `stream.pipeline`, typically the first stream is +a readable stream and the last a writable stream, forming a closed +circuit. + +If passed a `Function` is must be a factory method taking a `source` +`Iterable`. + +```mjs +import { compose, Transform } from 'stream'; + +const removeSpaces = new Transform({ + transform(chunk, encoding, callback) { + callback(null, String(chunk).replace(' ', '')); + } +}); + +async function* toUpper(source) { + for await (const chunk of source) { + yield String(chunk).toUpperCase(); + } +} + +let res = ''; +for await (const buf of compose(removeSpaces, toUpper).end('hello world')) { + res += buf; +} + +console.log(res); // prints 'HELLOWORLD' +``` + +`stream.compose` can be used to convert async iterables, generators and +functions into streams. + +* `AsyncIterable` converts into a readable `Duplex`. Cannot yield + `null`. +* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`. + Must take a source `AsyncIterable` as first parameter. Cannot yield + `null`. +* `AsyncFunction` converts into a writable `Duplex`. Must return + either `null` or `undefined`. + +```mjs +import { compose } from 'stream'; +import { finished } from 'stream/promises'; + +// Convert AsyncIterable into readable Duplex. +const s1 = compose(async function*() { + yield 'Hello'; + yield 'World'; +}()); + +// Convert AsyncGenerator into transform Duplex. +const s2 = compose(async function*(source) { + for await (const chunk of source) { + yield String(chunk).toUpperCase(); + } +}); + +let res = ''; + +// Convert AsyncFunction into writable Duplex. +const s3 = compose(async function(source) { + for await (const chunk of source) { + res += chunk; + } +}); + +await finished(compose(s1, s2, s3)); + +console.log(res); // prints 'HELLOWORLD' +``` + ### `stream.Readable.from(iterable, [options])` -* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]} +* `streams` {Stream[]} * Returns: {stream.Duplex} Combines two or more streams into a `Duplex` stream that writes to the @@ -1878,9 +1878,6 @@ when passing streams to `stream.pipeline`, typically the first stream is a readable stream and the last a writable stream, forming a closed circuit. -If passed a `Function` is must be a factory method taking a `source` -`Iterable`. - ```mjs import { compose, Transform } from 'stream'; @@ -1890,11 +1887,11 @@ const removeSpaces = new Transform({ } }); -async function* toUpper(source) { - for await (const chunk of source) { - yield String(chunk).toUpperCase(); +const toUpper = new Transform({ + transform(chunk, encoding, callback) { + callback(null, String(chunk).toUpperCase()); } -} +}); let res = ''; for await (const buf of compose(removeSpaces, toUpper).end('hello world')) { @@ -1904,48 +1901,6 @@ for await (const buf of compose(removeSpaces, toUpper).end('hello world')) { console.log(res); // prints 'HELLOWORLD' ``` -`stream.compose` can be used to convert async iterables, generators and -functions into streams. - -* `AsyncIterable` converts into a readable `Duplex`. Cannot yield - `null`. -* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`. - Must take a source `AsyncIterable` as first parameter. Cannot yield - `null`. -* `AsyncFunction` converts into a writable `Duplex`. Must return - either `null` or `undefined`. - -```mjs -import { compose } from 'stream'; -import { finished } from 'stream/promises'; - -// Convert AsyncIterable into readable Duplex. -const s1 = compose(async function*() { - yield 'Hello'; - yield 'World'; -}()); - -// Convert AsyncGenerator into transform Duplex. -const s2 = compose(async function*(source) { - for await (const chunk of source) { - yield String(chunk).toUpperCase(); - } -}); - -let res = ''; - -// Convert AsyncFunction into writable Duplex. -const s3 = compose(async function(source) { - for await (const chunk of source) { - res += chunk; - } -}); - -await finished(compose(s1, s2, s3)); - -console.log(res); // prints 'HELLOWORLD' -``` - ### `stream.Readable.from(iterable, [options])`