Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: introduce Body #39483

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 136 additions & 43 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,141 @@ const cleanup = finished(rs, (err) => {
// ...
});
```
#### Class: `stream.Body`
Copy link
Member

@jasnell jasnell Jul 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it makes things a bit more complicated, I would generally prefer to separate out the Body mixin pieces here from the Node.js specific additional APIs. Or, if not that, let's not call this Body to avoid confusion. stream.Consumers perhaps?

Further, I think an API with static methods similar to would be nicer here... it gives us more options for wrapping the methods in various API specific ways.

const ab = await stream.consumers.arrayBuffer(readable);

// ...

const ab = await stream.consumers.blob(stream.compose(async function()* { yield 'hello'; }));

I really don't want to introduce yet another top level data encapsulation object given how many we already have.

<!-- YAML
added: REPLACEME
-->

<!--type=class-->

`new stream.Body` can be used to seamlessly convert between different types of
read/write stream interfaces, including async functions and iterables.

* `AsyncIterable` converts into a readable. Cannot yield
`null`.
* `AsyncGeneratorFunction` converts into a readable/writable.
Must take a source `AsyncIterable` as first parameter. Cannot yield
`null`.
* `AsyncFunction` converts into a writable. Must return
either `null` or `undefined`.

```mjs
import { Body, compose } from 'stream';
import { finished } from 'stream/promises';
// Convert AsyncIterable into readable Duplex.
const s1 = new Body(async function*() {
yield 'Hello';
yield 'World';
}());
// Convert AsyncGenerator into transform Duplex.
const s2 = new Body(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// Convert AsyncFunction into writable Duplex.
const s3 = new Body(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3).toNodeStream());
console.log(res); // prints 'HELLOWORLD'
```

### `body.arrayBuffer()`
<!-- YAML
added: REPLACEME
-->

* Returns: {Promise}

Returns a promise that fulfills with an {ArrayBuffer} containing a copy of
the body data.

### `body.blob()`
<!-- YAML
added: REPLACEME
-->

* Returns: {Promise}

Returns a promise that fulfills with an {Blob} containing a copy of the body data.

### `body.buffer()`
<!-- YAML
added: REPLACEME
-->

* Returns: {Promise}

Returns a promise that fulfills with an {Buffer} containing a copy of the body data.

### `body.nodeStream()`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have the node-to-web adapters, and your new compose function hopefully landing soon, I don't think we need these. Let's keep this focused on the accumulator funtions.

<!-- YAML
added: REPLACEME
-->

* Returns: {Duplex}

Returns a `stream.Duplex`.

### `body.readableNodeStream()`
<!-- YAML
added: REPLACEME
-->

* Returns: {Readable}

Returns a `stream.Readable`.

### `body.writableNodeStream()`
<!-- YAML
added: REPLACEME
-->

* Returns: {Readable}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Returns: {Readable}
* Returns: {Writable}


Returns a `stream.Writable`.

### `body.readableWebStream()`
<!-- YAML
added: REPLACEME
-->

* Returns: {ReadableStream}

Returns a `web.ReadableStream`.

### `body.writableWebStream()`
<!-- YAML
added: REPLACEME
-->

* Returns: {WritableStream}

Returns a `web.WritableStream`.

### `body.text()`
<!-- YAML
added: REPLACEME
-->

* Returns: {Promise}

Returns a promise that resolves the contents of the body decoded as a UTF-8
string.

### `body.json()`
<!-- YAML
added: REPLACEME
-->

* Returns: {Promise}

Returns a promise that resolves the contents of the body parsed as a UTF-8
JSON.

### `stream.pipeline(source[, ...transforms], destination, callback)`
### `stream.pipeline(streams, callback)`
Expand Down Expand Up @@ -1861,7 +1996,7 @@ failure, this can cause event listener leaks and swallowed errors.
added: REPLACEME
-->

* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]|Body[]}
* Returns: {stream.Duplex}

Combines two or more streams into a `Duplex` stream that writes to the
Expand Down Expand Up @@ -1901,48 +2036,6 @@ for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
console.log(res); // prints 'HELLOWORLD'
```

`stream.compose` can be used to convert async iterables, generators and
functions into streams.

* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
`null`.
* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`.
Must take a source `AsyncIterable` as first parameter. Cannot yield
`null`.
* `AsyncFunction` converts into a writable `Duplex`. Must return
either `null` or `undefined`.

```mjs
import { compose } from 'stream';
import { finished } from 'stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD'
```

### `stream.Readable.from(iterable, [options])`
<!-- YAML
added:
Expand Down
Loading