From 17d49f1a15d5a521211361746c491524ea42b50e Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sun, 24 Jan 2021 21:58:52 +0200 Subject: [PATCH] initial attempt to split an asyncIterable into a set of new asyncIterables // adapted from: https://stackoverflow.com/questions/63543455/how-to-multicast-an-async-iterable // and: https://gist.github.com/jed/cc1e949419d42e2cb26d7f2e1645864d // and also: https://github.com/repeaterjs/repeater/issues/48#issuecomment-569134039 --- packages/pubsub/src/index.ts | 2 +- packages/pubsub/src/split.ts | 107 +++++++++++++++++++++++++++++++++++ packages/pubsub/src/types.ts | 2 + 3 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 packages/pubsub/src/split.ts diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index 7b48b6a1965..ffdbbff8412 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -1,4 +1,4 @@ export * from './in-memory-channel'; export * from './in-memory-pubsub'; - +export * from './split'; export * from './types'; diff --git a/packages/pubsub/src/split.ts b/packages/pubsub/src/split.ts new file mode 100644 index 00000000000..4a4a37931f9 --- /dev/null +++ b/packages/pubsub/src/split.ts @@ -0,0 +1,107 @@ +// adapted from: https://stackoverflow.com/questions/63543455/how-to-multicast-an-async-iterable +// and: https://gist.github.com/jed/cc1e949419d42e2cb26d7f2e1645864d +// and also: https://github.com/repeaterjs/repeater/issues/48#issuecomment-569134039 + +import { Push, Repeater } from '@repeaterjs/repeater'; + +import { Splitter } from './types'; + +export function split(asyncIterable: AsyncIterableIterator, n: number, splitter: Splitter>) { + const iterator = asyncIterable[Symbol.asyncIterator](); + const returner = iterator.return ?? undefined; + + const buffers: Array>> = Array(n).fill([]); + + if (returner) { + const set: Set = new Set(); + return buffers.map((buffer, index) => { + set.add(index); + return new Repeater(async (push, stop) => { + let earlyReturn: any; + stop.then(() => { + set.delete(index); + if (!set.size) { + earlyReturn = returner(); + } + }); + + await loop(push, earlyReturn, buffer, index, buffers, iterator, splitter); + + await earlyReturn; + }); + }); + } + + return buffers.map( + (buffer, index) => + new Repeater(async (push, stop) => { + let earlyReturn: any; + stop.then(() => { + earlyReturn = returner ? returner() : true; + }); + + await loop(push, earlyReturn, buffer, index, buffers, iterator, splitter); + + await earlyReturn; + }) + ); +} + +async function loop( + push: Push, + earlyReturn: Promise | any, + buffer: Array>, + index: number, + buffers: Array>>, + iterator: AsyncIterator, + splitter: Splitter> +): Promise { + /* eslint-disable no-unmodified-loop-condition */ + while (!earlyReturn) { + const iteration = await next(buffer, index, buffers, iterator, splitter); + + if (iteration === undefined) { + continue; + } + + if (iteration.done) { + stop(); + return iteration.value; + } + + await push(iteration.value); + } + /* eslint-enable no-unmodified-loop-condition */ +} + +async function next( + buffer: Array>, + index: number, + buffers: Array>>, + iterator: AsyncIterator, + splitter: Splitter> +): Promise | undefined> { + let iteration: IteratorResult; + + if (0 in buffer) { + return buffer.shift(); + } + + const iterationCandidate = await iterator.next(); + + const value = iterationCandidate.value; + if (value) { + const [iterationIndex, newValue] = splitter(value); + if (index === iterationIndex) { + return newValue; + } + + buffers[iterationIndex].push(iteration); + return undefined; + } + + for (const buffer of buffers) { + buffer.push(iteration); + } + return iterationCandidate; +} diff --git a/packages/pubsub/src/types.ts b/packages/pubsub/src/types.ts index 6aa10e02e7d..72ddce3e91c 100644 --- a/packages/pubsub/src/types.ts +++ b/packages/pubsub/src/types.ts @@ -12,3 +12,5 @@ export interface PubSub { subscribe(topic: string, buffer?: RepeaterBuffer): AsyncIterableIterator; close(reason?: any): Promise | unknown; } + +export type Splitter = (item: T) => [number, T];