From 9148e50a5ffe66c2f021f2f147074f62d3a9d4c3 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 29 Mar 2023 11:07:48 +0100 Subject: [PATCH 1/4] fix!: return iterators from synchronous sources Crossing async boundaries is not free. If a synchronous iterator is passed to a function, it should not force the caller to await on the result. This PR updates some of the basic `it-*` modules to not create unecessary async when they don't have to. The return types of these functions are now derived from the input types so if you have linting rules that forbid awaiting on non-thenables those awaits will need to be removed. BREAKING CHANGE: if you pass a synchronous iterator to a function it will return a synchronous generator in response --- packages/it-all/README.md | 20 ++- packages/it-all/src/index.ts | 23 ++- packages/it-all/test/index.spec.ts | 19 ++- packages/it-batch/README.md | 20 ++- packages/it-batch/src/index.ts | 74 ++++++++-- packages/it-batch/test/index.spec.ts | 39 +++-- packages/it-batched-bytes/README.md | 22 ++- packages/it-batched-bytes/package.json | 1 - packages/it-batched-bytes/src/index.ts | 134 +++++++++++------- packages/it-batched-bytes/test/index.spec.ts | 57 +++++--- packages/it-drain/README.md | 16 ++- packages/it-drain/package.json | 3 +- packages/it-drain/src/index.ts | 18 ++- packages/it-drain/test/index.spec.ts | 19 ++- packages/it-first/README.md | 18 ++- packages/it-first/src/index.ts | 21 ++- packages/it-first/test/index.spec.ts | 16 ++- packages/it-last/README.md | 18 ++- packages/it-last/src/index.ts | 24 +++- packages/it-last/test/index.spec.ts | 16 ++- packages/it-length/README.md | 18 ++- packages/it-length/src/index.ts | 32 ++++- packages/it-length/test/index.spec.ts | 16 ++- packages/it-parallel-batch/test/index.spec.ts | 15 +- packages/it-peekable/README.md | 25 +++- packages/it-peekable/src/index.ts | 10 +- packages/it-skip/README.md | 19 ++- packages/it-skip/src/index.ts | 37 ++++- packages/it-skip/test/index.spec.ts | 16 ++- packages/it-sort/README.md | 21 ++- packages/it-sort/src/index.ts | 24 +++- packages/it-sort/test/index.spec.ts | 21 ++- packages/it-take/README.md | 19 ++- packages/it-take/src/index.ts | 49 +++++-- packages/it-take/test/index.spec.ts | 18 ++- packages/it-to-buffer/README.md | 19 ++- packages/it-to-buffer/src/index.ts | 32 ++++- packages/it-to-buffer/test/index.spec.ts | 25 +++- 38 files changed, 801 insertions(+), 193 deletions(-) diff --git a/packages/it-all/README.md b/packages/it-all/README.md index cf5c4c2c..58470011 100644 --- a/packages/it-all/README.md +++ b/packages/it-all/README.md @@ -34,10 +34,24 @@ For when you need a one-liner to collect iterable values. ```javascript import all from 'it-all' -// This can also be an iterator, async iterator, generator, etc -const values = [0, 1, 2, 3, 4] +// This can also be an iterator, etc +const values = function * () { + yield * [0, 1, 2, 3, 4] +} -const arr = await all(values) +const arr = all(values) + +console.info(arr) // 0, 1, 2, 3, 4 +``` + +Async sources must be awaited: + +```javascript +const values = async function * () { + yield * [0, 1, 2, 3, 4] +} + +const arr = await all(values()) console.info(arr) // 0, 1, 2, 3, 4 ``` diff --git a/packages/it-all/src/index.ts b/packages/it-all/src/index.ts index e6e2a63e..39086678 100644 --- a/packages/it-all/src/index.ts +++ b/packages/it-all/src/index.ts @@ -1,13 +1,32 @@ +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} /** * Collects all values from an (async) iterable and returns them as an array */ -export default async function all (source: AsyncIterable | Iterable): Promise { +function all (source: Iterable): T[] +function all (source: AsyncIterable): Promise +function all (source: AsyncIterable | Iterable): Promise | T[] { + if (isAsyncIterable(source)) { + return (async () => { + const arr = [] + + for await (const entry of source) { + arr.push(entry) + } + + return arr + })() + } + const arr = [] - for await (const entry of source) { + for (const entry of source) { arr.push(entry) } return arr } + +export default all diff --git a/packages/it-all/test/index.spec.ts b/packages/it-all/test/index.spec.ts index b85f5487..d66cf5f9 100644 --- a/packages/it-all/test/index.spec.ts +++ b/packages/it-all/test/index.spec.ts @@ -4,11 +4,26 @@ import { expect } from 'aegir/chai' import all from '../src/index.js' describe('it-all', () => { - it('Should collect all entries of an async iterator as an array', async () => { + it('should collect all entries of an iterator as an array', () => { const values = [0, 1, 2, 3, 4] - const res = await all(values) + const res = all(values) + expect(res).to.not.have.property('then') + expect(res).to.deep.equal(values) + }) + + it('should collect all entries of an async iterator as an array', async () => { + const values = [0, 1, 2, 3, 4] + + const generator = (async function * (): AsyncGenerator { + yield * [0, 1, 2, 3, 4] + })() + + const p = all(generator) + expect(p).to.have.property('then').that.is.a('function') + + const res = await p expect(res).to.deep.equal(values) }) }) diff --git a/packages/it-batch/README.md b/packages/it-batch/README.md index cfe7cc72..5dc42b6f 100644 --- a/packages/it-batch/README.md +++ b/packages/it-batch/README.md @@ -35,11 +35,27 @@ The final batch may be smaller than the max. import batch from 'it-batch' import all from 'it-all' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [0, 1, 2, 3, 4] const batchSize = 2 -const result = await all(batch(values, batchSize)) +const result = all(batch(values, batchSize)) + +console.info(result) // [0, 1], [2, 3], [4] +``` + +Async sources must be awaited: + +```javascript +import batch from 'it-batch' +import all from 'it-all' + +const values = async function * () { + yield * [0, 1, 2, 3, 4] +} +const batchSize = 2 + +const result = await all(batch(values(), batchSize)) console.info(result) // [0, 1], [2, 3], [4] ``` diff --git a/packages/it-batch/src/index.ts b/packages/it-batch/src/index.ts index fbbc5171..9af276ea 100644 --- a/packages/it-batch/src/index.ts +++ b/packages/it-batch/src/index.ts @@ -1,27 +1,73 @@ +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} + /** * Takes an (async) iterable that emits things and returns an async iterable that * emits those things in fixed-sized batches */ -export default async function * batch (source: AsyncIterable | Iterable, size: number = 1): AsyncGenerator { - let things: T[] = [] +function batch (source: Iterable, size?: number): Generator +function batch (source: AsyncIterable | Iterable, size?: number): AsyncGenerator +function batch (source: AsyncIterable | Iterable, size: number = 1): Generator | AsyncGenerator { + size = Number(size) + + if (isAsyncIterable(source)) { + return (async function * () { + let things: T[] = [] + + if (size < 1) { + size = 1 + } + + if (size !== Math.round(size)) { + throw new Error('Batch size must be an integer') + } + + for await (const thing of source) { + things.push(thing) - if (size < 1) { - size = 1 + while (things.length >= size) { + yield things.slice(0, size) + + things = things.slice(size) + } + } + + while (things.length > 0) { + yield things.slice(0, size) + + things = things.slice(size) + } + }()) } - for await (const thing of source) { - things.push(thing) + return (function * () { + let things: T[] = [] - while (things.length >= size) { - yield things.slice(0, size) + if (size < 1) { + size = 1 + } - things = things.slice(size) + if (size !== Math.round(size)) { + throw new Error('Batch size must be an integer') } - } - while (things.length > 0) { - yield things.slice(0, size) + for (const thing of source) { + things.push(thing) - things = things.slice(size) - } + while (things.length >= size) { + yield things.slice(0, size) + + things = things.slice(size) + } + } + + while (things.length > 0) { + yield things.slice(0, size) + + things = things.slice(size) + } + }()) } + +export default batch diff --git a/packages/it-batch/test/index.spec.ts b/packages/it-batch/test/index.spec.ts index a9bbd23d..4e662481 100644 --- a/packages/it-batch/test/index.spec.ts +++ b/packages/it-batch/test/index.spec.ts @@ -5,51 +5,64 @@ import { expect } from 'aegir/chai' import all from 'it-all' describe('it-batch', () => { - it('should batch up entries', async () => { + it('should batch up entries', () => { const values = [0, 1, 2, 3, 4] const batchSize = 2 - const res = await all(batch(values, batchSize)) + const gen = batch(values, batchSize) + expect(gen[Symbol.iterator]).to.be.ok() + const res = all(gen) expect(res).to.deep.equal([[0, 1], [2, 3], [4]]) }) - it('should batch up entries without batch size', async () => { + it('should batch up async iterator of entries', async () => { + const values = async function * (): AsyncGenerator { + yield * [0, 1, 2, 3, 4] + } + const batchSize = 2 + const gen = batch(values(), batchSize) + expect(gen[Symbol.asyncIterator]).to.be.ok() + + const res = await all(gen) + expect(res).to.deep.equal([[0, 1], [2, 3], [4]]) + }) + + it('should batch up entries without batch size', () => { const values = [0, 1, 2, 3, 4] - const res = await all(batch(values)) + const res = all(batch(values)) expect(res).to.deep.equal([[0], [1], [2], [3], [4]]) }) - it('should batch up entries with negative batch size', async () => { + it('should batch up entries with negative batch size', () => { const values = [0, 1, 2, 3, 4] const batchSize = -1 - const res = await all(batch(values, batchSize)) + const res = all(batch(values, batchSize)) expect(res).to.deep.equal([[0], [1], [2], [3], [4]]) }) - it('should batch up entries with zero batch size', async () => { + it('should batch up entries with zero batch size', () => { const values = [0, 1, 2, 3, 4] const batchSize = 0 - const res = await all(batch(values, batchSize)) + const res = all(batch(values, batchSize)) expect(res).to.deep.equal([[0], [1], [2], [3], [4]]) }) - it('should batch up entries with string batch size', async () => { + it('should batch up entries with string batch size', () => { const values = [0, 1, 2, 3, 4] const batchSize = '2' // @ts-expect-error batchSize type is incorrect - const res = await all(batch(values, batchSize)) + const res = all(batch(values, batchSize)) expect(res).to.deep.equal([[0, 1], [2, 3], [4]]) }) - it('should batch up entries with non-integer batch size', async () => { + it('should throw when batching up entries with non-integer batch size', () => { const values = [0, 1, 2, 3, 4] const batchSize = 2.5 - const res = await all(batch(values, batchSize)) - expect(res).to.deep.equal([[0, 1], [2, 3], [4]]) + expect(() => all(batch(values, batchSize))).to.throw('Batch size must be an integer') }) }) diff --git a/packages/it-batched-bytes/README.md b/packages/it-batched-bytes/README.md index 8f4e2dd8..4f02a8cb 100644 --- a/packages/it-batched-bytes/README.md +++ b/packages/it-batched-bytes/README.md @@ -35,7 +35,7 @@ The final batch may be smaller than the max. import batch from 'it-batched-bytes' import all from 'it-all' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [ Uint8Array.from([0]), Uint8Array.from([1]), @@ -45,6 +45,26 @@ const values = [ ] const batchSize = 2 +const result = all(batch(values, { size: batchSize })) + +console.info(result) // [0, 1], [2, 3], [4] +``` + +Async sources must be awaited: + +```javascript +import batch from 'it-batched-bytes' +import all from 'it-all' + +const values = async function * () { + yield Uint8Array.from([0]) + yield Uint8Array.from([1]) + yield Uint8Array.from([2]) + yield Uint8Array.from([3]) + yield Uint8Array.from([4]) +} +const batchSize = 2 + const result = await all(batch(values, { size: batchSize })) console.info(result) // [0, 1], [2, 3], [4] diff --git a/packages/it-batched-bytes/package.json b/packages/it-batched-bytes/package.json index 9f90b8fd..88e77daf 100644 --- a/packages/it-batched-bytes/package.json +++ b/packages/it-batched-bytes/package.json @@ -135,7 +135,6 @@ "release": "aegir release" }, "dependencies": { - "it-stream-types": "^1.0.4", "p-defer": "^4.0.0", "uint8arraylist": "^2.4.1" }, diff --git a/packages/it-batched-bytes/src/index.ts b/packages/it-batched-bytes/src/index.ts index ab1a6c9e..29007930 100644 --- a/packages/it-batched-bytes/src/index.ts +++ b/packages/it-batched-bytes/src/index.ts @@ -1,6 +1,9 @@ import { Uint8ArrayList } from 'uint8arraylist' import defer from 'p-defer' -import type { Source } from 'it-stream-types' + +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} const DEFAULT_BATCH_SIZE = 1024 * 1024 const DEFAULT_SERIALIZE = (buf: Uint8Array | Uint8ArrayList, list: Uint8ArrayList): void => { list.append(buf) } @@ -12,29 +15,18 @@ export interface BatchedBytesOptions { size?: number /** - * If this amount of time passes, yield all the bytes in the batch even - * if they are below `size` (default: 0 - e.g. on every tick) + * If passed, this function should serialize the object and append the + * result to the passed list */ - yieldAfter?: number + serialize?: (object: Uint8Array | Uint8ArrayList, list: Uint8ArrayList) => void } -export interface BatchedOptions { - /** - * The minimum number of bytes that should be in a batch (default: 1MB) - */ - size?: number - +export interface AsyncBatchedBytesOptions extends BatchedBytesOptions { /** * If this amount of time passes, yield all the bytes in the batch even * if they are below `size` (default: 0 - e.g. on every tick) */ yieldAfter?: number - - /** - * If passed, this function should serialize the object and append the - * result to the passed list - */ - serialize: (object: T, list: Uint8ArrayList) => void } /** @@ -42,58 +34,94 @@ export interface BatchedOptions { * an internal buffer. Either once the buffer reaches the requested size * or the next event loop tick occurs, yield any bytes from the buffer. */ -function batchedBytes (source: Source, options?: BatchedBytesOptions): Source -function batchedBytes (source: Source, options: BatchedOptions): Source -async function * batchedBytes (source: Source, options: any = {}): any { - let buffer = new Uint8ArrayList() - let ended = false - let deferred = defer() +function batchedBytes (source: Iterable, options?: BatchedBytesOptions): Iterable +function batchedBytes (source: AsyncIterable, options?: AsyncBatchedBytesOptions): AsyncIterable +function batchedBytes (source: Iterable | AsyncIterable, options: AsyncBatchedBytesOptions = {}): AsyncIterable | Iterable { + if (isAsyncIterable(source)) { + return (async function * () { + let buffer = new Uint8ArrayList() + let ended = false + let deferred = defer() + + let size = Number(options.size ?? DEFAULT_BATCH_SIZE) + + if (isNaN(size) || size === 0 || size < 0) { + size = DEFAULT_BATCH_SIZE + } - let size = Number(options.size ?? DEFAULT_BATCH_SIZE) + if (size !== Math.round(size)) { + throw new Error('Batch size must be an integer') + } - if (isNaN(size) || size === 0 || size < 0) { - size = DEFAULT_BATCH_SIZE - } + const yieldAfter = options.yieldAfter ?? 0 + const serialize = options.serialize ?? DEFAULT_SERIALIZE - const yieldAfter = options.yieldAfter ?? 0 - const serialize = options.serialize ?? DEFAULT_SERIALIZE + void Promise.resolve().then(async () => { + try { + let timeout - void Promise.resolve().then(async () => { - try { - let timeout + for await (const buf of source) { + serialize(buf, buffer) - for await (const buf of source) { - serialize(buf, buffer) + if (buffer.byteLength >= size) { + clearTimeout(timeout) + deferred.resolve() + continue + } + + timeout = setTimeout(() => { // eslint-disable-line no-loop-func + deferred.resolve() + }, yieldAfter) + } - if (buffer.byteLength >= size) { clearTimeout(timeout) deferred.resolve() - continue + } catch (err) { + deferred.reject(err) + } finally { + ended = true + } + }) + + while (!ended) { // eslint-disable-line no-unmodified-loop-condition + await deferred.promise + deferred = defer() + if (buffer.byteLength > 0) { + const b = buffer + buffer = new Uint8ArrayList() + yield b.subarray() } - - timeout = setTimeout(() => { // eslint-disable-line no-loop-func - deferred.resolve() - }, yieldAfter) } + })() + } + + return (function * () { + const buffer = new Uint8ArrayList() + let size = Number(options.size ?? DEFAULT_BATCH_SIZE) - clearTimeout(timeout) - deferred.resolve() - } catch (err) { - deferred.reject(err) - } finally { - ended = true + if (isNaN(size) || size === 0 || size < 0) { + size = DEFAULT_BATCH_SIZE + } + + if (size !== Math.round(size)) { + throw new Error('Batch size must be an integer') + } + + const serialize = options.serialize ?? DEFAULT_SERIALIZE + + for (const buf of source) { + serialize(buf, buffer) + + if (buffer.byteLength >= size) { + yield buffer.subarray(0, size) + buffer.consume(size) + } } - }) - while (!ended) { // eslint-disable-line no-unmodified-loop-condition - await deferred.promise - deferred = defer() if (buffer.byteLength > 0) { - const b = buffer - buffer = new Uint8ArrayList() - yield b.subarray() + yield buffer.subarray() } - } + })() } export default batchedBytes diff --git a/packages/it-batched-bytes/test/index.spec.ts b/packages/it-batched-bytes/test/index.spec.ts index 1baa79ea..4d2c3556 100644 --- a/packages/it-batched-bytes/test/index.spec.ts +++ b/packages/it-batched-bytes/test/index.spec.ts @@ -5,7 +5,7 @@ import { expect } from 'aegir/chai' import all from 'it-all' describe('it-batched-bytes', () => { - it('should batch up entries', async () => { + it('should batch up entries', () => { const values = [ Uint8Array.of(0), Uint8Array.of(1), @@ -14,12 +14,30 @@ describe('it-batched-bytes', () => { Uint8Array.of(4) ] const batchSize = 2 - const res = await all(batch(values, { size: batchSize })) + const gen = batch(values, { size: batchSize }) + expect(gen[Symbol.iterator]).to.be.ok() + const res = all(gen) expect(res).to.deep.equal([Uint8Array.of(0, 1), Uint8Array.of(2, 3), Uint8Array.of(4)]) }) - it('should batch up entries without batch size', async () => { + it('should batch up an async iterator of entries', async () => { + const values = async function * (): AsyncGenerator { + yield Uint8Array.of(0) + yield Uint8Array.of(1) + yield Uint8Array.of(2) + yield Uint8Array.of(3) + yield Uint8Array.of(4) + } + const batchSize = 2 + const gen = batch(values(), { size: batchSize }) + expect(gen[Symbol.asyncIterator]).to.be.ok() + + const res = await all(gen) + expect(res).to.deep.equal([Uint8Array.of(0, 1), Uint8Array.of(2, 3), Uint8Array.of(4)]) + }) + + it('should batch up entries without batch size', () => { const values = [ Uint8Array.of(0), Uint8Array.of(1), @@ -27,12 +45,12 @@ describe('it-batched-bytes', () => { Uint8Array.of(3), Uint8Array.of(4) ] - const res = await all(batch(values)) + const res = all(batch(values)) expect(res).to.deep.equal([Uint8Array.of(0, 1, 2, 3, 4)]) }) - it('should batch up entries with negative batch size', async () => { + it('should batch up entries with negative batch size', () => { const values = [ Uint8Array.of(0), Uint8Array.of(1), @@ -41,12 +59,12 @@ describe('it-batched-bytes', () => { Uint8Array.of(4) ] const batchSize = -1 - const res = await all(batch(values, { size: batchSize })) + const res = all(batch(values, { size: batchSize })) expect(res).to.deep.equal([Uint8Array.of(0, 1, 2, 3, 4)]) }) - it('should batch up entries with zero batch size', async () => { + it('should batch up entries with zero batch size', () => { const values = [ Uint8Array.of(0), Uint8Array.of(1), @@ -55,12 +73,12 @@ describe('it-batched-bytes', () => { Uint8Array.of(4) ] const batchSize = 0 - const res = await all(batch(values, { size: batchSize })) + const res = all(batch(values, { size: batchSize })) expect(res).to.deep.equal([Uint8Array.of(0, 1, 2, 3, 4)]) }) - it('should batch up entries with string batch size', async () => { + it('should batch up entries with string batch size', () => { const values = [ Uint8Array.of(0), Uint8Array.of(1), @@ -70,12 +88,12 @@ describe('it-batched-bytes', () => { ] const batchSize = '2' // @ts-expect-error batchSize type is incorrect - const res = await all(batch(values, { size: batchSize })) + const res = all(batch(values, { size: batchSize })) expect(res).to.deep.equal([Uint8Array.of(0, 1), Uint8Array.of(2, 3), Uint8Array.of(4)]) }) - it('should batch up entries with non-integer batch size', async () => { + it('should throw when batching up entries with non-integer batch size', () => { const values = [ Uint8Array.of(0), Uint8Array.of(1), @@ -84,17 +102,22 @@ describe('it-batched-bytes', () => { Uint8Array.of(4) ] const batchSize = 2.5 - const res = await all(batch(values, { size: batchSize })) - expect(res).to.deep.equal([Uint8Array.of(0, 1, 2), Uint8Array.of(3, 4)]) + expect(() => all(batch(values, { size: batchSize }))).to.throw('Batch size must be an integer') }) it('should batch up values that need serializing', async () => { - const values = [0, 1, 2, 3, 4] - const batchSize = 2.5 - const res = await all(batch(values, { + const values = [ + Uint8Array.of(0), + Uint8Array.of(1), + Uint8Array.of(2), + Uint8Array.of(3), + Uint8Array.of(4) + ] + const batchSize = 3 + const res = all(batch(values, { size: batchSize, - serialize: (obj, list) => { list.append(Uint8Array.of(obj)) } + serialize: (obj, list) => { list.append(obj) } })) expect(res).to.deep.equal([Uint8Array.of(0, 1, 2), Uint8Array.of(3, 4)]) diff --git a/packages/it-drain/README.md b/packages/it-drain/README.md index b4a8cef9..675fe337 100644 --- a/packages/it-drain/README.md +++ b/packages/it-drain/README.md @@ -34,10 +34,22 @@ Mostly useful for tests or when you want to be explicit about consuming an itera ```javascript import drain from 'it-drain' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [0, 1, 2, 3, 4] -await drain(values) +drain(values) +``` + +Async sources must be awaited: + +```javascript +import drain from 'it-drain' + +const values = async function * { + yield * [0, 1, 2, 3, 4] +} + +await drain(values()) ``` ## License diff --git a/packages/it-drain/package.json b/packages/it-drain/package.json index 6b84e58a..ecffb17e 100644 --- a/packages/it-drain/package.json +++ b/packages/it-drain/package.json @@ -135,6 +135,7 @@ "release": "aegir release" }, "devDependencies": { - "aegir": "^38.1.7" + "aegir": "^38.1.7", + "delay": "^5.0.0" } } diff --git a/packages/it-drain/src/index.ts b/packages/it-drain/src/index.ts index 66c5e43c..c2ea2979 100644 --- a/packages/it-drain/src/index.ts +++ b/packages/it-drain/src/index.ts @@ -1,7 +1,21 @@ +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} + /** * Drains an (async) iterable discarding its' content and does not return * anything */ -export default async function drain (source: AsyncIterable | Iterable): Promise { - for await (const _ of source) { } // eslint-disable-line no-unused-vars,no-empty,@typescript-eslint/no-unused-vars +function drain (source: Iterable): void +function drain (source: AsyncIterable): Promise +function drain (source: AsyncIterable | Iterable): Promise | void { + if (isAsyncIterable(source)) { + return (async () => { + for await (const _ of source) { } // eslint-disable-line no-unused-vars,no-empty,@typescript-eslint/no-unused-vars + })() + } else { + for (const _ of source) { } // eslint-disable-line no-unused-vars,no-empty,@typescript-eslint/no-unused-vars + } } + +export default drain diff --git a/packages/it-drain/test/index.spec.ts b/packages/it-drain/test/index.spec.ts index a68a7d07..61b201a3 100644 --- a/packages/it-drain/test/index.spec.ts +++ b/packages/it-drain/test/index.spec.ts @@ -1,12 +1,15 @@ import { expect } from 'aegir/chai' +import delay from 'delay' import drain from '../src/index.js' describe('it-drain', () => { it('should empty an async iterator', async () => { let done = false - const iter = function * (): Generator { + const iter = async function * (): AsyncGenerator { yield 1 + await delay(1) yield 2 + await delay(1) yield 3 done = true } @@ -15,4 +18,18 @@ describe('it-drain', () => { expect(done).to.be.true() }) + + it('should empty an iterator', () => { + let done = false + const iter = function * (): Generator { + yield 1 + yield 2 + yield 3 + done = true + } + + drain(iter()) + + expect(done).to.be.true() + }) }) diff --git a/packages/it-first/README.md b/packages/it-first/README.md index fd7dd57d..e226cc52 100644 --- a/packages/it-first/README.md +++ b/packages/it-first/README.md @@ -34,10 +34,24 @@ Mostly useful for tests. ```javascript import first from 'it-first' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [0, 1, 2, 3, 4] -const res = await first(values) +const res = first(values) + +console.info(res) // 0 +``` + +Async sources must be awaited: + +```javascript +import first from 'it-first' + +const values = async function * () { + yield * [0, 1, 2, 3, 4] +} + +const res = await first(values()) console.info(res) // 0 ``` diff --git a/packages/it-first/src/index.ts b/packages/it-first/src/index.ts index 07e02fa4..02f5b53c 100644 --- a/packages/it-first/src/index.ts +++ b/packages/it-first/src/index.ts @@ -1,12 +1,29 @@ +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} /** * Returns the first result from an (async) iterable, unless empty, in which * case returns `undefined` */ -export default async function first (source: AsyncIterable | Iterable): Promise { - for await (const entry of source) { // eslint-disable-line no-unreachable-loop +function first (source: Iterable): T | undefined +function first (source: AsyncIterable): Promise +function first (source: AsyncIterable | Iterable): Promise | T | undefined { + if (isAsyncIterable(source)) { + return (async () => { + for await (const entry of source) { // eslint-disable-line no-unreachable-loop + return entry + } + + return undefined + })() + } + + for (const entry of source) { // eslint-disable-line no-unreachable-loop return entry } return undefined } + +export default first diff --git a/packages/it-first/test/index.spec.ts b/packages/it-first/test/index.spec.ts index b86a91cb..5f66cf83 100644 --- a/packages/it-first/test/index.spec.ts +++ b/packages/it-first/test/index.spec.ts @@ -2,11 +2,23 @@ import { expect } from 'aegir/chai' import first from '../src/index.js' describe('it-first', () => { - it('should return only the first result from an async iterator', async () => { + it('should return only the first result from an iterator', () => { const values = [0, 1, 2, 3, 4] - const res = await first(values) + const res = first(values) + expect(res).to.not.have.property('then') expect(res).to.equal(0) }) + + it('should return only the first result from an async iterator', async () => { + const values = (async function * (): AsyncGenerator { + yield * [0, 1, 2, 3, 4] + }()) + + const res = first(values) + + expect(res).to.have.property('then') + await expect(res).to.eventually.equal(0) + }) }) diff --git a/packages/it-last/README.md b/packages/it-last/README.md index caa194a2..72480424 100644 --- a/packages/it-last/README.md +++ b/packages/it-last/README.md @@ -34,10 +34,24 @@ Mostly useful for tests. ```javascript import last from 'it-last' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [0, 1, 2, 3, 4] -const res = await last(values) +const res = last(values) + +console.info(res) // 4 +``` + +Async sources must be awaited: + +```javascript +import last from 'it-last' + +const values = async function * () { + yield * [0, 1, 2, 3, 4] +} + +const res = await last(values()) console.info(res) // 4 ``` diff --git a/packages/it-last/src/index.ts b/packages/it-last/src/index.ts index 693265fb..4b855a3d 100644 --- a/packages/it-last/src/index.ts +++ b/packages/it-last/src/index.ts @@ -1,13 +1,33 @@ +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} + /** * Returns the last item of an (async) iterable, unless empty, in which case * return `undefined` */ -export default async function last (source: AsyncIterable | Iterable): Promise { +function last (source: Iterable): T | undefined +function last (source: AsyncIterable): Promise +function last (source: AsyncIterable | Iterable): Promise | T | undefined { + if (isAsyncIterable(source)) { + return (async () => { + let res + + for await (const entry of source) { + res = entry + } + + return res + })() + } + let res - for await (const entry of source) { + for (const entry of source) { res = entry } return res } + +export default last diff --git a/packages/it-last/test/index.spec.ts b/packages/it-last/test/index.spec.ts index dfd1f013..3341c2ac 100644 --- a/packages/it-last/test/index.spec.ts +++ b/packages/it-last/test/index.spec.ts @@ -2,14 +2,26 @@ import { expect } from 'aegir/chai' import last from '../src/index.js' describe('it-last', () => { - it('should return only the last result from an async iterator', async () => { + it('should return only the last result from an iterator', async () => { const values = [0, 1, 2, 3, 4] - const res = await last(values) + const res = last(values) + expect(res).to.not.have.property('then') expect(res).to.equal(4) }) + it('should return only the last result from an async iterator', async () => { + const values = (async function * (): AsyncGenerator { + yield * [0, 1, 2, 3, 4] + }()) + + const res = last(values) + + expect(res).to.have.property('then') + await expect(res).to.eventually.equal(4) + }) + it('should return undefined if the async iterator was empty', async () => { const values: any[] = [] diff --git a/packages/it-length/README.md b/packages/it-length/README.md index 3b3a3c43..b81bcc9c 100644 --- a/packages/it-length/README.md +++ b/packages/it-length/README.md @@ -34,10 +34,24 @@ N.b. will consume the iterable ```javascript import length from 'it-length' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [0, 1, 2, 3, 4] -const res = await length(values) +const res = length(values) + +console.info(res) // 5 +``` + +Async sources must be awaited: + +```javascript +import length from 'it-length' + +const values = async function * () { + yield * [0, 1, 2, 3, 4] +} + +const res = await length(values()) console.info(res) // 5 ``` diff --git a/packages/it-length/src/index.ts b/packages/it-length/src/index.ts index 7eb21b2f..1751a508 100644 --- a/packages/it-length/src/index.ts +++ b/packages/it-length/src/index.ts @@ -1,12 +1,32 @@ +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} + /** * Consumes the passed iterator and returns the number of items it contained */ -export default async function length (iterator: AsyncIterable | Iterable): Promise { - let count = 0 +function length (source: Iterable): number +function length (source: AsyncIterable): Promise +function length (source: AsyncIterable | Iterable): Promise | number { + if (isAsyncIterable(source)) { + return (async () => { + let count = 0 - for await (const _ of iterator) { // eslint-disable-line no-unused-vars,@typescript-eslint/no-unused-vars - count++ - } + for await (const _ of source) { // eslint-disable-line no-unused-vars,@typescript-eslint/no-unused-vars + count++ + } + + return count + })() + } else { + let count = 0 - return count + for (const _ of source) { // eslint-disable-line no-unused-vars,@typescript-eslint/no-unused-vars + count++ + } + + return count + } } + +export default length diff --git a/packages/it-length/test/index.spec.ts b/packages/it-length/test/index.spec.ts index cf3cdac6..7e580f06 100644 --- a/packages/it-length/test/index.spec.ts +++ b/packages/it-length/test/index.spec.ts @@ -2,11 +2,23 @@ import { expect } from 'aegir/chai' import length from '../src/index.js' describe('it-length', () => { - it('should count the items in an async iterator', async () => { + it('should count the items in an iterator', () => { const values = [0, 1, 2, 3, 4] - const res = await length(values) + const res = length(values) + expect(res).to.not.have.property('then') expect(res).to.equal(5) }) + + it('should count the items in an async iterator', async () => { + const values = (async function * (): AsyncGenerator { + yield * [0, 1, 2, 3, 4] + }()) + + const res = length(values) + + expect(res).to.have.property('then') + await expect(res).to.eventually.equal(5) + }) }) diff --git a/packages/it-parallel-batch/test/index.spec.ts b/packages/it-parallel-batch/test/index.spec.ts index a5dbde1b..688f57cc 100644 --- a/packages/it-parallel-batch/test/index.spec.ts +++ b/packages/it-parallel-batch/test/index.spec.ts @@ -190,23 +190,22 @@ describe('it-parallel-batch', () => { expect(res).to.deep.equal([1, 2]) }) - it('should batch up entries with non-integer batch size', async () => { - const input = [ - async () => { + it('should throw when batching up entries with non-integer batch size', async () => { + const input = async function * (): AsyncGenerator<() => Promise, void, undefined> { + yield async () => { await delay(200) return 1 - }, - async () => { + } + yield async () => { await delay(100) return 2 } - ] + } const batchSize = 2.5 - const res = await all(parallelBatch(input, batchSize)) - expect(res).to.deep.equal([1, 2]) + await expect(all(parallelBatch(input(), batchSize))).to.eventually.be.rejectedWith('Batch size must be an integer') }) it('should allow returning errors', async () => { diff --git a/packages/it-peekable/README.md b/packages/it-peekable/README.md index 5ba80a56..3440ae8a 100644 --- a/packages/it-peekable/README.md +++ b/packages/it-peekable/README.md @@ -34,12 +34,12 @@ Lets you look at the contents of an async iterator and decide what to do ```javascript import peekable from 'it-peekable' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [0, 1, 2, 3, 4] const it = peekable(value) -const first = await it.peek() +const first = it.peek() console.info(first) // 0 @@ -49,6 +49,27 @@ console.info([...it]) // [ 0, 1, 2, 3, 4 ] ``` +Async sources must be awaited: + +```javascript +import peekable from 'it-peekable' + +const values = async function * () { + yield * [0, 1, 2, 3, 4] +} + +const it = peekable(values()) + +const first = await it.peek() + +console.info(first) // 0 + +it.push(first) + +console.info(await all(it)) +// [ 0, 1, 2, 3, 4 ] +``` + ## License Licensed under either of diff --git a/packages/it-peekable/src/index.ts b/packages/it-peekable/src/index.ts index 9fc15f88..02fb79ca 100644 --- a/packages/it-peekable/src/index.ts +++ b/packages/it-peekable/src/index.ts @@ -15,11 +15,9 @@ type Peekable = Iterable & Peek & Push & Iterator type AsyncPeekable = AsyncIterable & AsyncPeek & Push & AsyncIterator -export default function peekableIterator | AsyncIterable> (iterable: I): I extends Iterable - ? Peekable - : I extends AsyncIterable - ? AsyncPeekable - : never { +function peekable (iterable: Iterable): Peekable +function peekable (iterable: AsyncIterable): AsyncPeekable +function peekable (iterable: Iterable | AsyncIterable): Peekable | AsyncPeekable { // @ts-expect-error const [iterator, symbol] = iterable[Symbol.asyncIterator] != null // @ts-expect-error @@ -52,3 +50,5 @@ export default function peekableIterator | AsyncIterable } } } + +export default peekable diff --git a/packages/it-skip/README.md b/packages/it-skip/README.md index 1ee0cc48..54898609 100644 --- a/packages/it-skip/README.md +++ b/packages/it-skip/README.md @@ -35,10 +35,25 @@ For when you are only interested in later values from an iterable. import take from 'it-skip' import all from 'it-all' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [0, 1, 2, 3, 4] -const arr = await all(skip(values, 2)) +const arr = all(skip(values, 2)) + +console.info(arr) // 2, 3, 4 +``` + +Async sources must be awaited: + +```javascript +import take from 'it-skip' +import all from 'it-all' + +const values = async function * () { + yield * [0, 1, 2, 3, 4] +} + +const arr = await all(skip(values(), 2)) console.info(arr) // 2, 3, 4 ``` diff --git a/packages/it-skip/src/index.ts b/packages/it-skip/src/index.ts index 630ab762..e4a129ab 100644 --- a/packages/it-skip/src/index.ts +++ b/packages/it-skip/src/index.ts @@ -1,15 +1,38 @@ +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} /** * Skip items from an iterable */ -export default async function * skip (source: AsyncIterable | Iterable, offset: number): AsyncGenerator { - for await (const entry of source) { - if (offset === 0) { - yield entry +function skip (source: Iterable, offset: number): Generator +function skip (source: AsyncIterable, offset: number): AsyncGenerator +function skip (source: AsyncIterable | Iterable, offset: number): AsyncGenerator | Generator { + if (isAsyncIterable(source)) { + return (async function * () { + for await (const entry of source) { + if (offset === 0) { + yield entry - continue - } + continue + } - offset-- + offset-- + } + })() } + + return (function * () { + for (const entry of source) { + if (offset === 0) { + yield entry + + continue + } + + offset-- + } + })() } + +export default skip diff --git a/packages/it-skip/test/index.spec.ts b/packages/it-skip/test/index.spec.ts index 14f642cb..363e8a1a 100644 --- a/packages/it-skip/test/index.spec.ts +++ b/packages/it-skip/test/index.spec.ts @@ -6,8 +6,22 @@ describe('it-skip', () => { it('should skip values from an iterable', async () => { const values = [0, 1, 2, 3, 4] - const res = await all(skip(values, 2)) + const gen = skip(values, 2) + expect(gen[Symbol.iterator]).to.be.ok() + const res = all(gen) + expect(res).to.deep.equal([2, 3, 4]) + }) + + it('should skip values from an async iterable', async () => { + const values = async function * (): AsyncGenerator { + yield * [0, 1, 2, 3, 4] + } + + const gen = skip(values(), 2) + expect(gen[Symbol.asyncIterator]).to.be.ok() + + const res = await all(gen) expect(res).to.deep.equal([2, 3, 4]) }) }) diff --git a/packages/it-sort/README.md b/packages/it-sort/README.md index 06448cdd..c15a6dfc 100644 --- a/packages/it-sort/README.md +++ b/packages/it-sort/README.md @@ -37,9 +37,28 @@ const sorter = (a, b) => { return a.localeCompare(b) } -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = ['foo', 'bar'] +const arr = all(sort(values, sorter)) + +console.info(arr) // 'bar', 'foo' +``` + +Async sources must be awaited: + +```javascript +import sort from 'it-sort' +import all from 'it-all' + +const sorter = (a, b) => { + return a.localeCompare(b) +} + +const values = async function * () { + yield * ['foo', 'bar'] +} + const arr = await all(sort(values, sorter)) console.info(arr) // 'bar', 'foo' diff --git a/packages/it-sort/src/index.ts b/packages/it-sort/src/index.ts index 886c1287..43f0acff 100644 --- a/packages/it-sort/src/index.ts +++ b/packages/it-sort/src/index.ts @@ -1,5 +1,9 @@ import all from 'it-all' +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} + export interface CompareFunction { (a: T, b: T): number } @@ -8,8 +12,22 @@ export interface CompareFunction { * Collects all values from an async iterator, sorts them * using the passed function and yields them */ -export default async function * sort (source: AsyncIterable | Iterable, sorter: CompareFunction): AsyncGenerator { - const arr = await all(source) +function sort (source: Iterable, sorter: CompareFunction): Generator +function sort (source: AsyncIterable, sorter: CompareFunction): AsyncGenerator +function sort (source: AsyncIterable | Iterable, sorter: CompareFunction): AsyncGenerator | Generator { + if (isAsyncIterable(source)) { + return (async function * () { + const arr = await all(source) + + yield * arr.sort(sorter) + })() + } - yield * arr.sort(sorter) + return (function * () { + const arr = all(source) + + yield * arr.sort(sorter) + })() } + +export default sort diff --git a/packages/it-sort/test/index.spec.ts b/packages/it-sort/test/index.spec.ts index 4321a978..f4383f69 100644 --- a/packages/it-sort/test/index.spec.ts +++ b/packages/it-sort/test/index.spec.ts @@ -3,14 +3,31 @@ import all from 'it-all' import sort, { CompareFunction } from '../src/index.js' describe('it-sort', () => { - it('should sort all entries of an array', async () => { + it('should sort all entries of an iterator', () => { const values = ['foo', 'bar'] const sorter: CompareFunction = (a, b) => { return a.localeCompare(b) } - const res = await all(sort(values, sorter)) + const gen = sort(values, sorter) + expect(gen[Symbol.iterator]).to.be.ok() + const res = all(gen) + expect(res).to.deep.equal(['bar', 'foo']) + }) + + it('should sort all entries of an async iterator', async () => { + const values = async function * (): AsyncGenerator { + yield * ['foo', 'bar'] + } + const sorter: CompareFunction = (a, b) => { + return a.localeCompare(b) + } + + const gen = sort(values(), sorter) + expect(gen[Symbol.asyncIterator]).to.be.ok() + + const res = await all(gen) expect(res).to.deep.equal(['bar', 'foo']) }) }) diff --git a/packages/it-take/README.md b/packages/it-take/README.md index 72f780fd..6d810e43 100644 --- a/packages/it-take/README.md +++ b/packages/it-take/README.md @@ -35,10 +35,25 @@ For when you only want a few values out of an iterable. import take from 'it-take' import all from 'it-all' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [0, 1, 2, 3, 4] -const arr = await all(take(values, 2)) +const arr = all(take(values, 2)) + +console.info(arr) // 0, 1 +``` + +Async sources must be awaited: + +```javascript +import take from 'it-take' +import all from 'it-all' + +const values = async function * () { + yield * [0, 1, 2, 3, 4] +} + +const arr = await all(take(values(), 2)) console.info(arr) // 0, 1 ``` diff --git a/packages/it-take/src/index.ts b/packages/it-take/src/index.ts index cc9df779..a8cab247 100644 --- a/packages/it-take/src/index.ts +++ b/packages/it-take/src/index.ts @@ -1,21 +1,50 @@ +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} /** * Stop iteration after n items have been received */ -export default async function * take (source: AsyncIterable | Iterable, limit: number): AsyncGenerator { - let items = 0 +function take (source: Iterable, limit: number): Generator +function take (source: AsyncIterable, limit: number): AsyncGenerator +function take (source: AsyncIterable | Iterable, limit: number): AsyncGenerator | Generator { + if (isAsyncIterable(source)) { + return (async function * () { + let items = 0 - if (limit < 1) { - return - } + if (limit < 1) { + return + } + + for await (const entry of source) { + yield entry - for await (const entry of source) { - yield entry + items++ - items++ + if (items === limit) { + return + } + } + })() + } + + return (function * () { + let items = 0 - if (items === limit) { + if (limit < 1) { return } - } + + for (const entry of source) { + yield entry + + items++ + + if (items === limit) { + return + } + } + })() } + +export default take diff --git a/packages/it-take/test/index.spec.ts b/packages/it-take/test/index.spec.ts index 3abc178e..21e8f628 100644 --- a/packages/it-take/test/index.spec.ts +++ b/packages/it-take/test/index.spec.ts @@ -3,11 +3,25 @@ import all from 'it-all' import take from '../src/index.js' describe('it-take', () => { - it('should limit the number of values returned from an iterable', async () => { + it('should limit the number of values returned from an iterable', () => { const values = [0, 1, 2, 3, 4] - const res = await all(take(values, 2)) + const gen = take(values, 2) + expect(gen[Symbol.iterator]).to.be.ok() + const res = all(gen) + expect(res).to.deep.equal([0, 1]) + }) + + it('should limit the number of values returned from an async iterable', async () => { + const values = async function * (): AsyncGenerator { + yield * [0, 1, 2, 3, 4] + } + + const gen = take(values(), 2) + expect(gen[Symbol.asyncIterator]).to.be.ok() + + const res = await all(gen) expect(res).to.deep.equal([0, 1]) }) }) diff --git a/packages/it-to-buffer/README.md b/packages/it-to-buffer/README.md index 34bb1907..286ecc43 100644 --- a/packages/it-to-buffer/README.md +++ b/packages/it-to-buffer/README.md @@ -32,10 +32,25 @@ Loading this module through a script tag will make it's exports available as `It ```javascript import toBuffer from 'it-to-buffer' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [Buffer.from([0, 1]), Buffer.from([2, 3])] -const result = await toBuffer(values) +const result = toBuffer(values) + +console.info(result) // Buffer[0, 1, 2, 3] +``` + +Async sources must be awaited: + +```javascript +import toBuffer from 'it-to-buffer' + +const values = async function * () { + yield Buffer.from([0, 1]) + yield Buffer.from([2, 3]) +} + +const result = await toBuffer(values()) console.info(result) // Buffer[0, 1, 2, 3] ``` diff --git a/packages/it-to-buffer/src/index.ts b/packages/it-to-buffer/src/index.ts index 3547635f..632097e3 100644 --- a/packages/it-to-buffer/src/index.ts +++ b/packages/it-to-buffer/src/index.ts @@ -1,15 +1,37 @@ import { concat as uint8ArrayConcat } from 'uint8arrays/concat' +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} + /** * Takes an (async) iterable that yields buffer-like-objects and concats them * into one buffer */ -export default async function toBuffer (stream: AsyncIterable | Iterable): Promise { - let buffer = new Uint8Array(0) +function toBuffer (source: Iterable): Uint8Array +function toBuffer (source: AsyncIterable): Promise +function toBuffer (source: AsyncIterable | Iterable): Promise | Uint8Array { + if (isAsyncIterable(source)) { + return (async () => { + let buffer = new Uint8Array(0) - for await (const buf of stream) { - buffer = uint8ArrayConcat([buffer, buf], buffer.length + buf.length) + for await (const buf of source) { + buffer = uint8ArrayConcat([buffer, buf], buffer.length + buf.length) + } + + return buffer + })() } - return buffer + const bufs = [] + let length = 0 + + for (const buf of source) { + bufs.push(buf) + length += buf.byteLength + } + + return uint8ArrayConcat(bufs, length) } + +export default toBuffer diff --git a/packages/it-to-buffer/test/index.spec.ts b/packages/it-to-buffer/test/index.spec.ts index 33e68260..97af0d5c 100644 --- a/packages/it-to-buffer/test/index.spec.ts +++ b/packages/it-to-buffer/test/index.spec.ts @@ -2,25 +2,40 @@ import { expect } from 'aegir/chai' import toBuffer from '../src/index.js' describe('it-to-buffer', () => { - it('should turn a generator that yields buffers into a buffer', async () => { - const iter = function * (): Generator { + it('should turn a generator that yields buffers into a buffer', () => { + const iter = function * (): Generator { yield Uint8Array.from([0]) yield Uint8Array.from([1]) yield Uint8Array.from([2]) } - const result = await toBuffer(iter()) + const result = toBuffer(iter()) + expect(result).to.not.have.property('then') expect(result).to.equalBytes(Uint8Array.from([0, 1, 2])) }) - it('should turn an array buffers into a buffer', async () => { - const result = await toBuffer([ + it('should turn an async generator that yields buffers into a buffer', async () => { + const iter = async function * (): AsyncGenerator { + yield Uint8Array.from([0]) + yield Uint8Array.from([1]) + yield Uint8Array.from([2]) + } + + const result = toBuffer(iter()) + + expect(result).to.have.property('then') + expect(await result).to.equalBytes(Uint8Array.from([0, 1, 2])) + }) + + it('should turn an array buffers into a buffer', () => { + const result = toBuffer([ Uint8Array.from([0]), Uint8Array.from([1]), Uint8Array.from([2]) ]) + expect(result).to.not.have.property('then') expect(result).to.equalBytes(Uint8Array.from([0, 1, 2])) }) }) From c34a0eb84ce73d77210639b9182b9ed4536730a6 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 29 Mar 2023 17:06:48 +0100 Subject: [PATCH 2/4] chore: make it-merge return sync --- packages/it-merge/README.md | 23 +++++++++- packages/it-merge/src/index.ts | 67 +++++++++++++++++++--------- packages/it-merge/test/index.spec.ts | 31 ++++++++++++- 3 files changed, 98 insertions(+), 23 deletions(-) diff --git a/packages/it-merge/README.md b/packages/it-merge/README.md index 30965015..9230f68e 100644 --- a/packages/it-merge/README.md +++ b/packages/it-merge/README.md @@ -35,11 +35,30 @@ Nb. sources are iterated over in parallel so the order of emitted items is not g import merge from 'it-merge' import all from 'it-all' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values1 = [0, 1, 2, 3, 4] const values2 = [5, 6, 7, 8, 9] -const arr = await all(merge(values1, values2)) +const arr = all(merge(values1, values2)) + +console.info(arr) // 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 +``` + +Async sources must be awaited: + +```javascript +import merge from 'it-merge' +import all from 'it-all' + +// This can also be an iterator, async iterator, generator, etc +const values1 = async function * () { + yield * [0, 1, 2, 3, 4] +} +const values2 = async function * () { + yield * [5, 6, 7, 8, 9] +} + +const arr = await all(merge(values1(), values2())) console.info(arr) // 0, 1, 5, 6, 2, 3, 4, 7, 8, 9 <- nb. order is not guaranteed ``` diff --git a/packages/it-merge/src/index.ts b/packages/it-merge/src/index.ts index 0f5729bc..961403b5 100644 --- a/packages/it-merge/src/index.ts +++ b/packages/it-merge/src/index.ts @@ -1,31 +1,58 @@ import { pushable } from 'it-pushable' +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} + /** * Treat one or more iterables as a single iterable. * * Nb. sources are iterated over in parallel so the * order of emitted items is not guaranteed. */ -export default async function * merge (...sources: Array | Iterable>): AsyncGenerator { - const output = pushable({ - objectMode: true - }) - - void Promise.resolve().then(async () => { - try { - await Promise.all( - sources.map(async (source) => { - for await (const item of source) { - output.push(item) - } - }) - ) - - output.end() - } catch (err: any) { - output.end(err) +function merge (...sources: Array>): Generator +function merge (...sources: Array | Iterable>): AsyncGenerator +function merge (...sources: Array | Iterable>): AsyncGenerator | Generator { + const syncSources: Array> = [] + + for (const source of sources) { + if (!isAsyncIterable(source)) { + syncSources.push(source) } - }) + } + + if (syncSources.length === sources.length) { + // all sources are synchronous + return (function * () { + for (const source of syncSources) { + yield * source + } + })() + } - yield * output + return (async function * () { + const output = pushable({ + objectMode: true + }) + + void Promise.resolve().then(async () => { + try { + await Promise.all( + sources.map(async (source) => { + for await (const item of source) { + output.push(item) + } + }) + ) + + output.end() + } catch (err: any) { + output.end(err) + } + }) + + yield * output + })() } + +export default merge diff --git a/packages/it-merge/test/index.spec.ts b/packages/it-merge/test/index.spec.ts index 38f39880..d9439b42 100644 --- a/packages/it-merge/test/index.spec.ts +++ b/packages/it-merge/test/index.spec.ts @@ -7,7 +7,36 @@ describe('it-merge', () => { const values1 = [0, 1, 2, 3, 4] const values2 = [5, 6, 7, 8, 9] - const res = await all(merge(values1, values2)) + const gen = merge(values1, values2) + expect(gen[Symbol.iterator]).to.be.ok() + + const res = all(gen) + + expect(res.sort((a, b) => { + if (a < b) { + return -1 + } + + if (a > b) { + return 1 + } + + return 0 + })).to.deep.equal([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + }) + + it('should merge multiple async iterators', async () => { + const values1 = async function * (): AsyncGenerator { + yield * [0, 1, 2, 3, 4] + } + const values2 = async function * (): AsyncGenerator { + yield * [5, 6, 7, 8, 9] + } + + const gen = merge(values1(), values2()) + expect(gen[Symbol.asyncIterator]).to.be.ok() + + const res = await all(gen) expect(res.sort((a, b) => { if (a < b) { From 7aa8bd039e4dd54518d41abb5819a447cee5513d Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 30 Mar 2023 17:10:30 +0100 Subject: [PATCH 3/4] chore: convert more to return non-async --- packages/it-flat-batch/README.md | 18 +++++- packages/it-flat-batch/src/index.ts | 54 +++++++++++++----- packages/it-flat-batch/test/index.spec.ts | 45 +++++++++------ packages/it-foreach/README.md | 20 ++++++- packages/it-foreach/src/index.ts | 52 +++++++++++++++-- packages/it-foreach/test/index.spec.ts | 68 +++++++++++++++++++++-- packages/it-map/README.md | 20 ++++++- packages/it-map/package.json | 3 + packages/it-map/src/index.ts | 50 ++++++++++++++++- packages/it-map/test/index.spec.ts | 40 +++++++++++-- packages/it-reduce/README.md | 18 +++++- packages/it-reduce/src/index.ts | 21 ++++++- packages/it-reduce/test/index.spec.ts | 17 +++++- packages/it-split/README.md | 25 ++++++++- packages/it-split/src/index.ts | 44 ++++++++++++--- packages/it-split/test/index.spec.ts | 59 ++++++++++++++++++-- 16 files changed, 477 insertions(+), 77 deletions(-) diff --git a/packages/it-flat-batch/README.md b/packages/it-flat-batch/README.md index b546e15a..9fc9a519 100644 --- a/packages/it-flat-batch/README.md +++ b/packages/it-flat-batch/README.md @@ -39,7 +39,23 @@ import all from 'it-all' const values = [[0, 1, 2], [3], [4]] const batchSize = 2 -const result = await all(batch(values, batchSize)) +const result = all(batch(values, batchSize)) + +console.info(result) // [0, 1], [2, 3], [4] +``` + +Async sources must be awaited: + +```javascript +import batch from 'it-flat-batch' +import all from 'it-all' + +const values = async function * () { + yield * [[0, 1, 2], [3], [4]] +} +const batchSize = 2 + +const result = await all(batch(values(), batchSize)) console.info(result) // [0, 1], [2, 3], [4] ``` diff --git a/packages/it-flat-batch/src/index.ts b/packages/it-flat-batch/src/index.ts index 308fc144..f9712133 100644 --- a/packages/it-flat-batch/src/index.ts +++ b/packages/it-flat-batch/src/index.ts @@ -1,11 +1,15 @@ +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} /** * Takes an (async) iterable that emits variable length arrays of things and * returns an async iterable that emits those things in fixed-size batches */ -export default async function * batch (source: AsyncIterable | Iterable, batchSize: number = 1): AsyncGenerator { - // @ts-expect-error - expects string not a number - let size = parseInt(batchSize) +function batch (source: Iterable, batchSize?: number): Generator +function batch (source: AsyncIterable, batchSize?: number): AsyncGenerator +function batch (source: AsyncIterable | Iterable, batchSize: number = 1): AsyncGenerator | Generator { + let size = parseInt(`${batchSize}`) if (isNaN(size) || size < 1) { size = 1 @@ -13,19 +17,43 @@ export default async function * batch (source: AsyncIterable | Iterable let things: T[] = [] - for await (const set of source) { - things = things.concat(set) + if (isAsyncIterable(source)) { + return (async function * () { + for await (const set of source) { + things = things.concat(set) - while (things.length >= size) { - yield things.slice(0, size) + while (things.length >= size) { + yield things.slice(0, size) - things = things.slice(size) - } - } + things = things.slice(size) + } + } - while (things.length > 0) { - yield things.slice(0, size) + while (things.length > 0) { + yield things.slice(0, size) - things = things.slice(size) + things = things.slice(size) + } + })() } + + return (function * () { + for (const set of source) { + things = things.concat(set) + + while (things.length >= size) { + yield things.slice(0, size) + + things = things.slice(size) + } + } + + while (things.length > 0) { + yield things.slice(0, size) + + things = things.slice(size) + } + })() } + +export default batch diff --git a/packages/it-flat-batch/test/index.spec.ts b/packages/it-flat-batch/test/index.spec.ts index cec0d70f..34b5005f 100644 --- a/packages/it-flat-batch/test/index.spec.ts +++ b/packages/it-flat-batch/test/index.spec.ts @@ -3,70 +3,79 @@ import all from 'it-all' import batch from '../src/index.js' describe('it-batch', () => { - it('should batch up emitted arrays', async () => { + it('should batch up emitted arrays', () => { const values = [[0, 1, 2], [3], [4]] - const res = await all(batch(values, 2)) + const res = all(batch(values, 2)) expect(res).to.deep.equal([[0, 1], [2, 3], [4]]) }) - it('should batch up emitted arrays in singles', async () => { + it('should batch up asyncly emitted arrays', async () => { + const values = async function * (): AsyncGenerator { + yield * [[0, 1, 2], [3], [4]] + } + const res = await all(batch(values(), 2)) + + expect(res).to.deep.equal([[0, 1], [2, 3], [4]]) + }) + + it('should batch up emitted arrays in singles', () => { const values = [[0, 1, 2], [3], [4]] - const res = await all(batch(values, 1)) + const res = all(batch(values, 1)) expect(res).to.deep.equal([[0], [1], [2], [3], [4]]) }) - it('should batch up emitted arrays in one array', async () => { + it('should batch up emitted arrays in one array', () => { const values = [[0, 1, 2], [3], [4]] - const res = await all(batch(values, 100)) + const res = all(batch(values, 100)) expect(res).to.deep.equal([[0, 1, 2, 3, 4]]) }) - it('should batch up emitted arrays in small arrays', async () => { + it('should batch up emitted arrays in small arrays', () => { const values = [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9, 10]] - const res = await all(batch(values, 1)) + const res = all(batch(values, 1)) expect(res).to.deep.equal([[0], [1], [2], [3], [4], [5], [6], [7], [8], [9], [10]]) }) - it('should batch up emitted arrays when no batch size is passed', async () => { + it('should batch up emitted arrays when no batch size is passed', () => { const values = [[0, 1, 2], [3], [4]] - const res = await all(batch(values)) + const res = all(batch(values)) expect(res).to.deep.equal([[0], [1], [2], [3], [4]]) }) - it('should batch up entries with negative batch size', async () => { + it('should batch up entries with negative batch size', () => { const values = [[0, 1, 2], [3], [4]] const batchSize = -1 - const res = await all(batch(values, batchSize)) + const res = all(batch(values, batchSize)) expect(res).to.deep.equal([[0], [1], [2], [3], [4]]) }) - it('should batch up entries with zero batch size', async () => { + it('should batch up entries with zero batch size', () => { const values = [[0, 1, 2], [3, 4]] const batchSize = 0 - const res = await all(batch(values, batchSize)) + const res = all(batch(values, batchSize)) expect(res).to.deep.equal([[0], [1], [2], [3], [4]]) }) - it('should batch up entries with string batch size', async () => { + it('should batch up entries with string batch size', () => { const values = [[0, 1, 2], [3, 4]] const batchSize = '2' // @ts-expect-error batchSize type is wrong - const res = await all(batch(values, batchSize)) + const res = all(batch(values, batchSize)) expect(res).to.deep.equal([[0, 1], [2, 3], [4]]) }) - it('should batch up entries with non-integer batch size', async () => { + it('should batch up entries with non-integer batch size', () => { const values = [[0, 1, 2], [3, 4]] const batchSize = 2.5 - const res = await all(batch(values, batchSize)) + const res = all(batch(values, batchSize)) expect(res).to.deep.equal([[0, 1], [2, 3], [4]]) }) diff --git a/packages/it-foreach/README.md b/packages/it-foreach/README.md index 098836ed..dd936d61 100644 --- a/packages/it-foreach/README.md +++ b/packages/it-foreach/README.md @@ -35,15 +35,31 @@ For when you need a one-liner to collect iterable values. import each from 'it-foreach' import drain from 'it-drain' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [0, 1, 2, 3, 4] // prints 0, 1, 2, 3, 4 -const arr = await drain( +const arr = drain( each(values, console.info) ) ``` +Async sources and callbacks must be awaited: + +```javascript +import each from 'it-foreach' +import drain from 'it-drain' + +const values = async function * () { + yield * [0, 1, 2, 3, 4] +} + +// prints 0, 1, 2, 3, 4 +const arr = await drain( + each(values(), console.info) +) +``` + ## License Licensed under either of diff --git a/packages/it-foreach/src/index.ts b/packages/it-foreach/src/index.ts index 8b95ab10..78aed5c9 100644 --- a/packages/it-foreach/src/index.ts +++ b/packages/it-foreach/src/index.ts @@ -1,10 +1,54 @@ +import peek from 'it-peekable' + +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} /** * Invokes the passed function for each item in an iterable */ -export default async function * forEach (source: AsyncIterable | Iterable, fn: (thing: T) => void | Promise): AsyncGenerator { - for await (const thing of source) { - await fn(thing) - yield thing +function forEach (source: Iterable, fn: (thing: T) => Promise): AsyncGenerator +function forEach (source: Iterable, fn: (thing: T) => void): Generator +function forEach (source: AsyncIterable, fn: (thing: T) => void | Promise): AsyncGenerator +function forEach (source: AsyncIterable | Iterable, fn: (thing: T) => void | Promise): AsyncGenerator | Generator { + if (isAsyncIterable(source)) { + return (async function * () { + for await (const thing of source) { + await fn(thing) + yield thing + } + })() + } + + // if fn function returns a promise we have to return an async generator + const peekable = peek(source) + const { value, done } = peekable.next() + + if (done === true) { + return (function * () {}()) + } + + const res = fn(value) + + if (typeof res?.then === 'function') { + return (async function * () { + yield value + + for await (const val of peekable) { + await fn(val) + yield val + } + })() } + + const func = fn as (val: T) => void + + return (function * () { + for (const val of source) { + func(val) + yield val + } + })() } + +export default forEach diff --git a/packages/it-foreach/test/index.spec.ts b/packages/it-foreach/test/index.spec.ts index 530511cd..da6d5398 100644 --- a/packages/it-foreach/test/index.spec.ts +++ b/packages/it-foreach/test/index.spec.ts @@ -3,25 +3,81 @@ import { expect } from 'aegir/chai' import forEach from '../src/index.js' describe('it-for-each', () => { - it('should iterate over every value', async () => { + it('should iterate over every value', () => { const values = [0, 1, 2, 3, 4] let sum = 0 - const res = await all(forEach(values, (val) => { + const gen = forEach(values, (val) => { sum += val - })) + }) + + expect(gen[Symbol.iterator]).to.be.ok() + + const res = all(gen) expect(res).to.deep.equal(values) expect(10).to.equal(sum) }) - it('should abort source', async () => { + it('should iterate over every async value', async () => { + const values = async function * (): AsyncGenerator { + yield * [0, 1, 2, 3, 4] + } + let sum = 0 + + const gen = forEach(values(), (val) => { + sum += val + }) + + expect(gen[Symbol.asyncIterator]).to.be.ok() + + const res = await all(gen) + + expect(res).to.deep.equal(await all(values())) + expect(10).to.equal(sum) + }) + + it('should iterate over every value asyncly', async () => { + const values = [0, 1, 2, 3, 4] + let sum = 0 + + const gen = forEach(values, async (val) => { + sum += val + }) + + expect(gen[Symbol.asyncIterator]).to.be.ok() + + const res = await all(gen) + + expect(res).to.deep.equal(values) + expect(10).to.equal(sum) + }) + + it('should iterate over every async value asyncly', async () => { + const values = async function * (): AsyncGenerator { + yield * [0, 1, 2, 3, 4] + } + let sum = 0 + + const gen = forEach(values(), async (val) => { + sum += val + }) + + expect(gen[Symbol.asyncIterator]).to.be.ok() + + const res = await all(gen) + + expect(res).to.deep.equal(await all(values())) + expect(10).to.equal(sum) + }) + + it('should abort source', () => { const values = [0, 1, 2, 3, 4] let sum = 0 const err = new Error('wat') try { - await all(forEach(values, (val) => { + all(forEach(values, (val) => { sum += val if (val === 3) { @@ -32,7 +88,7 @@ describe('it-for-each', () => { throw new Error('Did not abort') } catch (e) { expect(e).to.equal(err) - expect(6).to.equal(sum) + expect(sum).to.equal(6) } }) }) diff --git a/packages/it-map/README.md b/packages/it-map/README.md index b4881c83..a8373ab2 100644 --- a/packages/it-map/README.md +++ b/packages/it-map/README.md @@ -32,12 +32,26 @@ Loading this module through a script tag will make it's exports available as `It ```javascript import map from 'it-map' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [0, 1, 2, 3, 4] -const result = await map(values, (val) => val++) +const result = map(values, (val) => val++) -console.info(result) // 15 +console.info(result) // [1, 2, 3, 4, 5] +``` + +Async sources and transforms must be awaited: + +```javascript +import map from 'it-map' + +const values = async function * () { + yield * [0, 1, 2, 3, 4] +} + +const result = await map(values(), async (val) => val++) + +console.info(result) // [1, 2, 3, 4, 5] ``` ## License diff --git a/packages/it-map/package.json b/packages/it-map/package.json index 9dc97d07..84a99a10 100644 --- a/packages/it-map/package.json +++ b/packages/it-map/package.json @@ -136,5 +136,8 @@ }, "devDependencies": { "aegir": "^38.1.7" + }, + "dependencies": { + "it-peekable": "^2.0.0" } } diff --git a/packages/it-map/src/index.ts b/packages/it-map/src/index.ts index 72dff632..2aebfb3d 100644 --- a/packages/it-map/src/index.ts +++ b/packages/it-map/src/index.ts @@ -1,9 +1,53 @@ +import peek from 'it-peekable' + +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} + /** * Takes an (async) iterable and returns one with each item mapped by the passed * function */ -export default async function * map (source: AsyncIterable | Iterable, func: (val: I) => O | Promise): AsyncGenerator { - for await (const val of source) { - yield func(val) +function map (source: Iterable, func: (val: I) => Promise): AsyncGenerator +function map (source: Iterable, func: (val: I) => O): Generator +function map (source: AsyncIterable, func: (val: I) => O | Promise): AsyncGenerator +function map (source: AsyncIterable | Iterable, func: (val: I) => O | Promise): AsyncGenerator | Generator { + if (isAsyncIterable(source)) { + return (async function * () { + for await (const val of source) { + yield func(val) + } + })() + } + + // if mapping function returns a promise we have to return an async generator + const peekable = peek(source) + const { value, done } = peekable.next() + + if (done === true) { + return (function * () {}()) + } + + const res = func(value) + + // @ts-expect-error .then is not present on O + if (typeof res.then === 'function') { + return (async function * () { + yield await res + + for await (const val of peekable) { + yield func(val) + } + })() } + + const fn = func as (val: I) => O + + return (function * () { + for (const val of source) { + yield fn(val) + } + })() } + +export default map diff --git a/packages/it-map/test/index.spec.ts b/packages/it-map/test/index.spec.ts index 133d8496..773c911c 100644 --- a/packages/it-map/test/index.spec.ts +++ b/packages/it-map/test/index.spec.ts @@ -3,21 +3,53 @@ import map from '../src/index.js' describe('it-map', () => { it('should map an async iterator', async () => { - const iter = function * (): Generator { + const iter = async function * (): AsyncGenerator { yield 1 } - for await (const result of map(iter(), (val) => val + 1)) { + const gen = map(iter(), (val) => val + 1) + expect(gen[Symbol.asyncIterator]).to.be.ok() + + for await (const result of gen) { expect(result).to.equal(2) } }) it('should map an async iterator to a promise', async () => { - const iter = function * (): Generator { + const iter = async function * (): AsyncGenerator { + yield 1 + } + + const gen = map(iter(), async (val) => val + 1) + expect(gen[Symbol.asyncIterator]).to.be.ok() + + for await (const result of gen) { + expect(result).to.equal(2) + } + }) + + it('should map an iterator', () => { + const iter = function * (): Generator { yield 1 } - for await (const result of map(iter(), async (val) => val + 1)) { + const gen = map(iter(), (val) => val + 1) + expect(gen[Symbol.iterator]).to.be.ok() + + for (const result of gen) { + expect(result).to.equal(2) + } + }) + + it('should map an iterator to a promise', async () => { + const iter = function * (): Generator { + yield 1 + } + + const gen = map(iter(), async (val) => val + 1) + expect(gen[Symbol.asyncIterator]).to.be.ok() + + for await (const result of gen) { expect(result).to.equal(2) } }) diff --git a/packages/it-reduce/README.md b/packages/it-reduce/README.md index 1cb6c279..d92ad968 100644 --- a/packages/it-reduce/README.md +++ b/packages/it-reduce/README.md @@ -34,10 +34,24 @@ Mostly useful for tests or when you want to be explicit about consuming an itera ```javascript import reduce from 'it-reduce' -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [0, 1, 2, 3, 4] -const result = await reduce(values, (acc, curr) => acc + curr, 0) +const result = reduce(values, (acc, curr) => acc + curr, 0) + +console.info(result) // 10 +``` + +Async sources must be awaited: + +```javascript +import reduce from 'it-reduce' + +const values = async function * () { + yield * [0, 1, 2, 3, 4] +} + +const result = await reduce(values(), (acc, curr) => acc + curr, 0) console.info(result) // 10 ``` diff --git a/packages/it-reduce/src/index.ts b/packages/it-reduce/src/index.ts index f1329d1a..28a67226 100644 --- a/packages/it-reduce/src/index.ts +++ b/packages/it-reduce/src/index.ts @@ -1,11 +1,28 @@ +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} /** * Reduces the values yielded by an (async) iterable */ -export default async function reduce (source: AsyncIterable | Iterable, func: (acc: V, curr: T) => V, init: V): Promise { - for await (const val of source) { +function reduce (source: Iterable, func: (acc: V, curr: T) => V, init: V): V +function reduce (source: AsyncIterable, func: (acc: V, curr: T) => V, init: V): Promise +function reduce (source: AsyncIterable | Iterable, func: (acc: V, curr: T) => V, init: V): Promise | V { + if (isAsyncIterable(source)) { + return (async function () { + for await (const val of source) { + init = func(init, val) + } + + return init + })() + } + + for (const val of source) { init = func(init, val) } return init } + +export default reduce diff --git a/packages/it-reduce/test/index.spec.ts b/packages/it-reduce/test/index.spec.ts index 89bff1c6..cd866f54 100644 --- a/packages/it-reduce/test/index.spec.ts +++ b/packages/it-reduce/test/index.spec.ts @@ -2,15 +2,28 @@ import { expect } from 'aegir/chai' import reduce from '../src/index.js' describe('it-reduce', () => { - it('should reduce the values yielded from an async iterator', async () => { + it('should reduce the values yielded from an iterator', () => { const iter = function * (): Generator { yield 1 yield 2 yield 3 } - const result = await reduce(iter(), (acc, curr) => acc + curr, 0) + const result = reduce(iter(), (acc, curr) => acc + curr, 0) expect(result).to.equal(6) }) + + it('should reduce the values yielded from an async iterator', async () => { + const iter = async function * (): AsyncGenerator { + yield 1 + yield 2 + yield 3 + } + + const result = reduce(iter(), (acc, curr) => acc + curr, 0) + + expect(result).to.have.property('then').that.is.a('function') + await expect(result).to.eventually.equal(6) + }) }) diff --git a/packages/it-split/README.md b/packages/it-split/README.md index cac50f4d..b7f23d53 100644 --- a/packages/it-split/README.md +++ b/packages/it-split/README.md @@ -34,13 +34,13 @@ import split from 'it-split' const encoder = new TextEncoder() -// This can also be an iterator, async iterator, generator, etc +// This can also be an iterator, generator, etc const values = [ encoder.encode('hello\nwor'), encoder.encode('ld') ] -const arr = await all(split(values)) +const arr = all(split(values)) console.info(arr) // [encoder.encode('hello'), encoder.encode('world')] ``` @@ -55,13 +55,32 @@ const values = [ ] const delimiter = Uint8Array.from([1, 2]) -const arr = await all(split(values, { +const arr = all(split(values, { delimiter })) console.info(arr) // [ Buffer.from([0]), Buffer.from([3, 0]), Buffer.from([3, 1]) ] ``` +Async sources must be awaited: + +```javascript +import split from 'it-split' + +const encoder = new TextEncoder() + +const values = async function * () { + yield * [ + encoder.encode('hello\nwor'), + encoder.encode('ld') + ] +} + +const arr = await all(split(values())) + +console.info(arr) // [encoder.encode('hello'), encoder.encode('world')] +``` + ## License Licensed under either of diff --git a/packages/it-split/src/index.ts b/packages/it-split/src/index.ts index caea10fa..9a9df636 100644 --- a/packages/it-split/src/index.ts +++ b/packages/it-split/src/index.ts @@ -4,27 +4,51 @@ export interface SplitOptions { delimiter?: Uint8Array } +function isAsyncIterable (thing: any): thing is AsyncIterable { + return thing[Symbol.asyncIterator] != null +} + /** * Splits Uint8Arrays emitted by an (async) iterable by a delimiter */ -export default async function * split (source: AsyncIterable | Iterable, options: SplitOptions = {}): AsyncGenerator { +function split (source: Iterable, options?: SplitOptions): Generator +function split (source: AsyncIterable, options?: SplitOptions): AsyncGenerator +function split (source: AsyncIterable | Iterable, options: SplitOptions = {}): AsyncGenerator | Generator { const bl = new Uint8ArrayList() const delimiter = options.delimiter ?? new TextEncoder().encode('\n') - for await (const buf of source) { - bl.append(buf) + if (isAsyncIterable(source)) { + return (async function * () { + for await (const buf of source) { + bl.append(buf) - yield * yieldUntilEnd(bl, delimiter) - } + yield * yieldUntilEnd(bl, delimiter) + } - yield * yieldUntilEnd(bl, delimiter) + yield * yieldUntilEnd(bl, delimiter) - if (bl.length > 0) { - yield bl.slice() + if (bl.length > 0) { + yield bl.slice() + } + })() } + + return (function * () { + for (const buf of source) { + bl.append(buf) + + yield * yieldUntilEnd(bl, delimiter) + } + + yield * yieldUntilEnd(bl, delimiter) + + if (bl.length > 0) { + yield bl.slice() + } + })() } -async function * yieldUntilEnd (bl: Uint8ArrayList, delimiter: Uint8Array): AsyncGenerator { +function * yieldUntilEnd (bl: Uint8ArrayList, delimiter: Uint8Array): Generator { let index = bl.indexOf(delimiter) while (index !== -1) { @@ -35,3 +59,5 @@ async function * yieldUntilEnd (bl: Uint8ArrayList, delimiter: Uint8Array): Asyn index = bl.indexOf(delimiter) } } + +export default split diff --git a/packages/it-split/test/index.spec.ts b/packages/it-split/test/index.spec.ts index a718305a..0026daa2 100644 --- a/packages/it-split/test/index.spec.ts +++ b/packages/it-split/test/index.spec.ts @@ -5,22 +5,24 @@ import split from '../src/index.js' import { Buffer } from 'buffer' describe('it-split', () => { - it('should split Uint8Arrays by newlines', async () => { + it('should split Uint8Arrays by newlines', () => { const encoder = new TextEncoder() const values = [ encoder.encode('hello\nwor'), encoder.encode('ld') ] - const res = await all(split(values)) + const gen = split(values) + expect(gen[Symbol.iterator]).to.be.ok() + const res = all(gen) expect(res.map(buf => toString(buf))).to.deep.equal([ 'hello', 'world' ]) }) - it('should split Uint8Arrays by arbitrary delimiters', async () => { + it('should split Uint8Arrays by arbitrary delimiters', () => { const values = [ Uint8Array.from([0, 1, 2, 3]), Uint8Array.from([0, 1, 2, 3]), @@ -28,9 +30,56 @@ describe('it-split', () => { ] const delimiter = Uint8Array.from([1, 2]) - const res = await all(split(values, { + const gen = split(values, { delimiter - })) + }) + expect(gen[Symbol.iterator]).to.be.ok() + + const res = all(gen) + + expect(res).to.deep.equal([ + Buffer.from([0]), + Buffer.from([3, 0]), + Buffer.from([3, 1]) + ]) + }) + + it('should split async Uint8Arrays by newlines', async () => { + const encoder = new TextEncoder() + const values = async function * (): AsyncGenerator { + yield * [ + encoder.encode('hello\nwor'), + encoder.encode('ld') + ] + } + + const gen = split(values()) + expect(gen[Symbol.asyncIterator]).to.be.ok() + + const res = await all(gen) + + expect(res.map(buf => toString(buf))).to.deep.equal([ + 'hello', + 'world' + ]) + }) + + it('should split Uint8Arrays by arbitrary delimiters', async () => { + const values = async function * (): AsyncGenerator { + yield * [ + Uint8Array.from([0, 1, 2, 3]), + Uint8Array.from([0, 1, 2, 3]), + Uint8Array.from([1, 1, 2]) + ] + } + const delimiter = Uint8Array.from([1, 2]) + + const gen = split(values(), { + delimiter + }) + expect(gen[Symbol.asyncIterator]).to.be.ok() + + const res = await all(gen) expect(res).to.deep.equal([ Buffer.from([0]), From ad9afc8a4b49332b17c83c9c7c1740dabb72ffa1 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 30 Mar 2023 17:17:09 +0100 Subject: [PATCH 4/4] chore: add missing dep --- packages/it-foreach/package.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/it-foreach/package.json b/packages/it-foreach/package.json index ed47e099..1410a2ea 100644 --- a/packages/it-foreach/package.json +++ b/packages/it-foreach/package.json @@ -137,5 +137,8 @@ "devDependencies": { "aegir": "^38.1.7", "it-all": "^2.0.0" + }, + "dependencies": { + "it-peekable": "^2.0.0" } }