Skip to content

Commit

Permalink
feat: add full NodeJS.ReadableStream support
Browse files Browse the repository at this point in the history
  • Loading branch information
ctjlewis committed Mar 3, 2023
1 parent 35e3106 commit fabf6e6
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 165 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ jobs:
strategy:
matrix:
nodejs: [
# 14,
# 16,
14,
16,
18,
# "lts/*"
]
Expand Down
110 changes: 11 additions & 99 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,106 +1,18 @@
# `yield-stream`

- **Docs: https://yield-stream.vercel.app**
[**Github**](https://github.com/gptlabs/yield-stream) |
[**NPM**](https://npmjs.com/package/yield-stream) |
[**Docs**](https://yield-stream.vercel.app)

A small library for switching between streams, generators, and arrays.
A small library for switching between streams, generators, and arrays. See docs
for details.

### Note: Using `NodeJS.ReadableStream`

```ts
/**
* `compose(f, g, h, ...)` returns a generator function `G(data)` that yields
* all `(f · g · h · ...)(data)`.
*
* @note Used to compose multiple transforms into a `pipeline`.
*/
export const compose = <T>(
...generators: GeneratorFn<T>[]
): GeneratorFn<T> => {
return generators.reduce(
(prev, next) => async function* (data) {
for await (const chunk of prev(data)) {
yield* next(chunk);
}
},
);
};

/**
* Accepts a stream and transforms and returns a stream of the transformed
* chunks. Transforms can yield multiple chunks per input chunk.
*/
export const pipeline = <T>(
stream: ReadableStream<T>,
...transforms: GeneratorFn<T>[]
): ReadableStream<T> => {
const composed = compose(...transforms);
return generateStream(
async function* () {
for await (const chunk of yieldStream(stream)) {
yield* composed(chunk);
}
}
);
};

/**
* Accepts a stream and yields all of its chunks.
*/
export const yieldStream = async function* <T>(
stream: ReadableStream<T>,
controller?: AbortController
) {
const reader = stream.getReader();
while (true) {
if (controller?.signal.aborted) {
break;
}

const { done, value } = await reader.read();
if (done) {
break;
}

yield value;
}
};
By default, this library uses WHATWG `ReadableStream`, which is only available
on Node 18+. If you are on an older version of Node or otherwise need to use
`NodeJS.ReadableStream`, import from:

/**
* Accepts a generator function and streams its outputs.
*/
export const generateStream = <T, TReturn, D>(
G: StreamGenerator<D, T, TReturn>,
data?: D
): ReadableStream<T> => {
return new ReadableStream<T>({
async start(controller) {
for await (const chunk of G(data)) {
controller.enqueue(chunk);
}
controller.close();
},
});
};

/**
* Accepts an array and returns a stream of its items.
*/
export const streamArray = <T>(array: T[]): ReadableStream<T> => {
return generateStream(function* () {
for (const item of array) {
yield item;
}
});
};

/**
* Accepts a stream and yields a growing buffer of all chunks received.
*/
export const buffer = async function* <T>(stream: ReadableStream<T>) {
const buffer: T[] = [];

for await (const chunk of yieldStream(stream)) {
buffer.push(chunk);
yield buffer;
}
};
```ts
import { yieldStream } from "yield-stream/node";
```
1 change: 1 addition & 0 deletions node.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./dist/platforms/node";
1 change: 1 addition & 0 deletions node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./dist/platforms/node.js";
7 changes: 5 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
},
"types": "dist/index.d.ts",
"files": [
"dist"
"dist",
"node.d.ts",
"node.js"
],
"exports": {
"./package.json": "./package.json",
"./node": "./node.js",
".": "./dist/index.js",
"./*": "./dist/*/index.js"
},
Expand Down Expand Up @@ -41,4 +44,4 @@
"shim-streams": "^0.0.2",
"web-streams-polyfill": "^3.2.1"
}
}
}
7 changes: 5 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
export * from "./lib";
export * from "./types";
/**
* `.` entrypoint is Edge by default. NodeJS.ReadableStream version available at
* `./node`.
*/
export * from "./platforms/edge";
65 changes: 22 additions & 43 deletions src/lib.ts → src/lib/edge.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import { GeneratorFn, StreamGenerator } from "./types";
import { GeneratorFn, StreamGenerator } from "../types";
import { compose, generateArray } from "./shared";

/**
* `compose(f, g, h, ...)` returns a generator function `G(data)` that yields
* all `(f · g · h · ...)(data)`.
*
* @note Used to compose multiple transforms into a `pipeline`.
* Accepts a generator function and streams its outputs.
*/
export const compose = <Chunk>(
...generators: GeneratorFn<Chunk>[]
): GeneratorFn<Chunk> => {
return generators.reduce(
(prev, next) => async function* (data) {
for await (const chunk of prev(data)) {
yield* next(chunk);
export const generateStream = <Chunk, Return, Data>(
G: StreamGenerator<Data, Chunk, Return>,
data?: Data
): ReadableStream<Chunk> => {
return new ReadableStream<Chunk>({
async start(controller) {
for await (const chunk of G(data)) {
controller.enqueue(chunk);
}
controller.close();
},
);
});
};

/**
Expand All @@ -36,6 +36,15 @@ export const pipeline = <Chunk>(
);
};

/**
* Accepts an array and returns a stream of its items.
*/
export const streamArray = <Chunk>(
array: Chunk[]
): ReadableStream<Chunk> => {
return generateStream(generateArray(array));
};

/**
* Accepts a stream and yields all of its chunks.
*/
Expand All @@ -58,36 +67,6 @@ export const yieldStream = async function* <Chunk>(
}
};

/**
* Accepts a generator function and streams its outputs.
*/
export const generateStream = <Chunk, Return, Data>(
G: StreamGenerator<Data, Chunk, Return>,
data?: Data
): ReadableStream<Chunk> => {
return new ReadableStream<Chunk>({
async start(controller) {
for await (const chunk of G(data)) {
controller.enqueue(chunk);
}
controller.close();
},
});
};

/**
* Accepts an array and returns a stream of its items.
*/
export const streamArray = <Chunk>(
array: Chunk[]
): ReadableStream<Chunk> => {
return generateStream(function* () {
for (const item of array) {
yield item;
}
});
};

/**
* Accepts a stream and yields a growing buffer of all chunks received.
*/
Expand Down
73 changes: 73 additions & 0 deletions src/lib/node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { Readable } from "stream";
import { GeneratorFn, StreamGenerator } from "../types";
import { compose, generateArray } from "./shared";

/**
* Accepts a generator function and returns a NodeJS.ReadableStream of its
* outputs.
*/
export const generateStream = <Chunk, Return, Data>(
G: StreamGenerator<Data, Chunk, Return>,
data?: Data
): NodeJS.ReadableStream => {
const readable = Readable.from(G(data));
return readable;
};


/**
* Accepts a stream and yields all of its chunks.
*/
export const yieldStream = async function* <Chunk>(
stream: NodeJS.ReadableStream,
controller?: AbortController
): AsyncGenerator<Chunk> {
for await (const chunk of stream) {
if (controller?.signal.aborted) {
break;
}

yield chunk as Chunk;
}
};

/**
* Accepts a stream and transforms and returns a stream of the transformed
* chunks. Transforms can yield multiple chunks per input chunk.
*/
export const pipeline = <Chunk>(
stream: NodeJS.ReadableStream,
...transforms: GeneratorFn<Chunk>[]
): NodeJS.ReadableStream => {
const composed = compose(...transforms);
return generateStream(
async function* () {
for await (const chunk of yieldStream<Chunk>(stream)) {
yield* composed(chunk);
}
}
);
};

/**
* Accepts an array and returns a stream of its items.
*/
export const streamArray = <Chunk>(
array: Chunk[]
): NodeJS.ReadableStream => {
return generateStream(generateArray(array));
};

/**
* Accepts a stream and yields a growing buffer of all chunks received.
*/
export const buffer = async function* <Chunk>(
stream: NodeJS.ReadableStream
) {
const buffer: Chunk[] = [];

for await (const chunk of yieldStream<Chunk>(stream)) {
buffer.push(chunk);
yield buffer;
}
};
32 changes: 32 additions & 0 deletions src/lib/shared.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { GeneratorFn } from "../types";

/**
* `compose(f, g, h, ...)` returns a generator function `G(data)` that yields
* all `(f · g · h · ...)(data)`.
*
* @note Used to compose multiple transforms into a `pipeline`.
*/
export const compose = <Chunk>(
...generators: GeneratorFn<Chunk>[]
): GeneratorFn<Chunk> => {
return generators.reduce(
(prev, next) => async function* (data) {
for await (const chunk of prev(data)) {
yield* next(chunk);
}
},
);
};

/**
* Accepts an array and returns a generator function that yields its items.
*/
export const generateArray = <Chunk>(
array: Chunk[]
) => {
return function* () {
for (const item of array) {
yield item;
}
};
};
Loading

1 comment on commit fabf6e6

@vercel
Copy link

@vercel vercel bot commented on fabf6e6 Mar 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

yield-stream – ./

yield-stream-gptlabs.vercel.app
yield-stream.vercel.app
yield-stream-git-master-gptlabs.vercel.app

Please sign in to comment.