Skip to content

Commit

Permalink
stream: add compose operator
Browse files Browse the repository at this point in the history
PR-URL: nodejs#44937
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Robert Nagy <[email protected]>
  • Loading branch information
rluvaton authored and lucshi committed Nov 9, 2022
1 parent 3c18375 commit f153190
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 15 deletions.
39 changes: 39 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,41 @@ option. In the code example above, data will be in a single chunk if the file
has less then 64 KiB of data because no `highWaterMark` option is provided to
[`fs.createReadStream()`][].

##### `readable.compose(stream[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `stream` {Stream|Iterable|AsyncIterable|Function}
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Duplex} a stream composed with the stream `stream`.

```mjs
import { Readable } from 'node:stream';

async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ');

for (const word of words) {
yield word;
}
}
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();

console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
```

See [`stream.compose`][] for more information.

##### `readable.iterator([options])`

<!-- YAML
Expand Down Expand Up @@ -2720,6 +2755,8 @@ await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD'
```

See [`readable.compose(stream)`][] for `stream.compose` as operator.

### `stream.Readable.from(iterable[, options])`

<!-- YAML
Expand Down Expand Up @@ -4487,11 +4524,13 @@ contain multi-byte characters.
[`process.stdin`]: process.md#processstdin
[`process.stdout`]: process.md#processstdout
[`readable._read()`]: #readable_readsize
[`readable.compose(stream)`]: #readablecomposestream-options
[`readable.map`]: #readablemapfn-options
[`readable.push('')`]: #readablepush
[`readable.setEncoding()`]: #readablesetencodingencoding
[`stream.Readable.from()`]: #streamreadablefromiterable-options
[`stream.addAbortSignal()`]: #streamaddabortsignalsignal-stream
[`stream.compose`]: #streamcomposestreams
[`stream.cork()`]: #writablecork
[`stream.finished()`]: #streamfinishedstream-options-callback
[`stream.pipe()`]: #readablepipedestination-options
Expand Down
32 changes: 32 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const { AbortController } = require('internal/abort_controller');

const {
codes: {
ERR_INVALID_ARG_VALUE,
ERR_INVALID_ARG_TYPE,
ERR_MISSING_ARGS,
ERR_OUT_OF_RANGE,
Expand All @@ -17,6 +18,11 @@ const {
} = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');
const staticCompose = require('internal/streams/compose');
const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');
const { isWritable, isNodeStream } = require('internal/streams/utils');

const {
ArrayPrototypePush,
Expand All @@ -32,6 +38,31 @@ const {
const kEmpty = Symbol('kEmpty');
const kEof = Symbol('kEof');

function compose(stream, options) {
if (options != null) {
validateObject(options, 'options');
}
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}

if (isNodeStream(stream) && !isWritable(stream)) {
throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable');
}

const composedStream = staticCompose(this, stream);

if (options?.signal) {
// Not validating as we already validated before
addAbortSignalNoValidate(
options.signal,
composedStream
);
}

return composedStream;
}

function map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -392,6 +423,7 @@ module.exports.streamReturningOperators = {
flatMap,
map,
take,
compose,
};

module.exports.promiseReturningOperators = {
Expand Down
127 changes: 127 additions & 0 deletions test/parallel/test-stream-compose-operator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
'use strict';

const common = require('../common');
const {
Readable, Transform,
} = require('stream');
const assert = require('assert');

{
// with async generator
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) {
let str = '';
for await (const chunk of stream) {
str += chunk;

if (str.length === 2) {
yield str;
str = '';
}
}
});
const result = ['ab', 'cd'];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// With Transformer
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk);
}, 4)
}));
const result = ['a', 'b', 'c', 'd'];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Throwing an error during `compose` (before waiting for data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield

throw new Error('boom');
});

assert.rejects(async () => {
for await (const item of stream) {
assert.fail('should not reach here, got ' + item);
}
}, /boom/).then(common.mustCall());
}

{
// Throwing an error during `compose` (when waiting for data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) {
for await (const chunk of stream) {
if (chunk === 3) {
throw new Error('boom');
}
yield chunk;
}
});

assert.rejects(
stream.toArray(),
/boom/,
).then(common.mustCall());
}

{
// Throwing an error during `compose` (after finishing all readable data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield

// eslint-disable-next-line no-unused-vars,no-empty
for await (const chunk of stream) {
}

throw new Error('boom');
});
assert.rejects(
stream.toArray(),
/boom/,
).then(common.mustCall());
}

{
// AbortSignal
const ac = new AbortController();
const stream = Readable.from([1, 2, 3, 4, 5])
.compose(async function *(source) {
// Should not reach here
for await (const chunk of source) {
yield chunk;
}
}, { signal: ac.signal });

ac.abort();

assert.rejects(async () => {
for await (const item of stream) {
assert.fail('should not reach here, got ' + item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());
}

{
assert.throws(
() => Readable.from(['a']).compose(Readable.from(['b'])),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}

{
assert.throws(
() => Readable.from(['a']).compose(),
{ code: 'ERR_INVALID_ARG_TYPE' }
);
}
27 changes: 12 additions & 15 deletions test/parallel/test-stream-compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -358,27 +358,24 @@ const assert = require('assert');
}

{
try {
compose();
} catch (err) {
assert.strictEqual(err.code, 'ERR_MISSING_ARGS');
}
assert.throws(
() => compose(),
{ code: 'ERR_MISSING_ARGS' }
);
}

{
try {
compose(new Writable(), new PassThrough());
} catch (err) {
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
}
assert.throws(
() => compose(new Writable(), new PassThrough()),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}

{
try {
compose(new PassThrough(), new Readable({ read() {} }), new PassThrough());
} catch (err) {
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
}
assert.throws(
() => compose(new PassThrough(), new Readable({ read() {} }), new PassThrough()),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}

{
Expand Down

0 comments on commit f153190

Please sign in to comment.