diff --git a/src/Lazy/fork.ts b/src/Lazy/fork.ts new file mode 100644 index 00000000..3a0430f8 --- /dev/null +++ b/src/Lazy/fork.ts @@ -0,0 +1,257 @@ +import { isAsyncIterable, isIterable } from "../_internal/utils"; +import { LinkedList } from "../dataStructure/linkedList/linkedList"; +import type { LinkedListNode } from "../dataStructure/linkedList/linkedListNode"; +import isNil from "../isNil"; +import type IterableInfer from "../types/IterableInfer"; +import type { Reject, Resolve } from "../types/Utils"; +import type { Concurrent } from "./concurrent"; + +type ReturnForkType | AsyncIterable> = + A extends AsyncIterable + ? AsyncIterableIterator> + : IterableIterator>; + +type Value = any; + +type ForkItem = { + queue: LinkedList>; + next: () => IteratorResult; +}; + +const forkMap = new WeakMap, ForkItem>(); + +function sync(iterable: Iterable) { + const iterator = iterable[Symbol.iterator](); + + const getNext = (forkItem: ForkItem) => { + let current: LinkedListNode> | null = + forkItem.queue.getLastNode(); + + const done = () => { + iterator.next = forkItem.next; + + return { + done: true, + value: undefined, + } as const; + }; + + let isDone = false; + const next = () => { + if (isDone) { + return done(); + } + + const item = current?.getNext(); + + if (isNil(item) || item === forkItem.queue.getTail()) { + const node = forkItem.next(); + + current = forkItem.queue.insertLast(node); + isDone = node.done ?? true; + if (isDone) { + return done(); + } + + return node; + } + + current = item; + return current.getValue(); + }; + + return next; + }; + + let forkItem = forkMap.get(iterator) as ForkItem; + + if (!forkItem) { + const originNext = iterator.next.bind(iterator); + forkItem = { + queue: new LinkedList(), + next: originNext, + }; + + iterator.next = getNext(forkItem); + forkMap.set(iterator, forkItem); + } + + const next = getNext(forkItem); + + return { + [Symbol.iterator]() { + return this; + }, + + next: next, + }; +} + +type ForkAsyncItem = { + queue: LinkedList>; + next: (...args: any) => Promise>; + done: boolean; +}; + +const forkAsyncMap = new WeakMap, ForkAsyncItem>(); + +function async(iterable: AsyncIterable) { + const iterator = iterable[Symbol.asyncIterator](); + + const getNext = (forkItem: ForkAsyncItem) => { + const settlementQueue: [Resolve, Reject][] = []; + let currentNode: LinkedListNode> | null = + forkItem.queue.getLastNode() ?? null; + let nextCallCount = 0; + let resolvedCount = 0; + let prevItem = Promise.resolve(); + + const fillBuffer = (concurrent: Concurrent) => { + const nextItem = forkItem.next(concurrent); + + prevItem = prevItem + .then(() => nextItem) + .then(({ done, value }) => { + if (done) { + while (settlementQueue.length > 0) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const [resolve] = settlementQueue.shift()!; + resolve({ done: true, value: undefined }); + } + + return void (forkItem.done = true); + } + + forkItem.queue.insertLast({ done: false, value }); + recur(concurrent); + }) + .catch((reason) => { + forkItem.done = true; + while (settlementQueue.length > 0) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const [, reject] = settlementQueue.shift()!; + reject(reason); + } + }); + }; + + function consumeBuffer() { + while ( + forkItem.queue.hasNext(currentNode) && + nextCallCount > resolvedCount + ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const result = currentNode.getNext()!.getValue()!; + currentNode = currentNode.getNext(); + + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const [resolve] = settlementQueue.shift()!; + resolve(result); + resolvedCount++; + } + } + + function recur(concurrent: Concurrent) { + if (nextCallCount === resolvedCount) { + return; + } else if (forkItem.queue.hasNext(currentNode)) { + consumeBuffer(); + } else { + fillBuffer(concurrent); + } + } + + const next = async (concurrent: Concurrent) => { + if (forkItem.done && !forkItem.queue.hasNext(currentNode)) { + return { done: true, value: undefined }; + } + + nextCallCount++; + return new Promise((resolve, reject) => { + settlementQueue.push([resolve, reject]); + recur(concurrent); + }); + }; + + return next; + }; + + let forkItem = forkAsyncMap.get(iterator) as ForkAsyncItem; + if (!forkItem) { + const originNext = iterator.next.bind(iterator); + forkItem = { + queue: new LinkedList(), + next: originNext, + done: false, + }; + + iterator.next = getNext(forkItem) as any; + forkAsyncMap.set(iterator, forkItem); + } + + const next = getNext(forkItem); + return { + [Symbol.asyncIterator]() { + return this; + }, + next: next, + }; +} + +/** + * Returns an iterable of forks of original source. Each fork contains the same values as source, and can be consumed independently. + * + * @example + * ```ts + * const arr = [1, 2, 3]; + * const iter1 = fork(arr); + * const iter2 = fork(arr); + * + * iter1.next() // {done:false, value: 1} + * iter1.next() // {done:false, value: 2} + * iter2.next() // {done:false, value: 1} + * iter2.next() // {done:false, value: 2} + * + * const str = 'abc' + * const strIter1 = fork(str); + * const strIter2 = fork(str); + * + * strIter1.next() // {done:false, value: 'a'} + * strIter1.next() // {done:false, value: 'b'} + * strIter2.next() // {done:false, value: 'a'} + * strIter2.next() // {done:false, value: 'b'} + * + * // with pipe + * const arrAdd10 = pipe( + * [1, 2, 3], + * map((a) => a + 10), + * ); + * + * const arrAdd10Iter1 = fork(arrAdd10); + * const arrAdd10Iter2 = fork(arrAdd10); + * arrAdd10Iter1.next() // { value: 11, done: false } + * arrAdd10Iter2.next() // { value: 11, done: false } + * + * const arrAdd10Iter3 = fork(arrAdd10Iter1); + * arrAdd10Iter1.next() // { value: 12, done: false } + * arrAdd10Iter1.next() // { value: 13, done: false } + * arrAdd10Iter2.next() // { value: 12, done: false } + * arrAdd10Iter3.next() // { value: 12, done: false } + * ``` + */ +// prettier-ignore +function fork | AsyncIterable>( + iterable: A, +): ReturnForkType { + if (isIterable(iterable)) { + return sync(iterable) as any; + } + + if (isAsyncIterable(iterable)) { + return async(iterable) as any; + } + + throw new TypeError("'iterable' must be type of Iterable or AsyncIterable"); +} + +export default fork; diff --git a/src/Lazy/index.ts b/src/Lazy/index.ts index 222cb7f0..1481003e 100644 --- a/src/Lazy/index.ts +++ b/src/Lazy/index.ts @@ -15,6 +15,7 @@ import entries from "./entries"; import filter from "./filter"; import flat from "./flat"; import flatMap from "./flatMap"; +import fork from "./fork"; import intersection from "./intersection"; import intersectionBy from "./intersectionBy"; import keys from "./keys"; @@ -60,6 +61,7 @@ export { filter, flat, flatMap, + fork, intersection, intersectionBy, keys, diff --git a/src/dataStructure/linkedList/linkedList.ts b/src/dataStructure/linkedList/linkedList.ts new file mode 100644 index 00000000..b7f7537e --- /dev/null +++ b/src/dataStructure/linkedList/linkedList.ts @@ -0,0 +1,93 @@ +import { LinkedListNode } from "./linkedListNode"; + +export class LinkedList { + private head: LinkedListNode; + private tail: LinkedListNode; + + constructor() { + this.head = new LinkedListNode(null as unknown as T); + this.tail = new LinkedListNode(null as unknown as T); + + this.head.setNextNode(this.tail); + this.tail.setPrevNode(this.head); + } + + insertFirst(value: T): LinkedListNode { + const node = new LinkedListNode(value); + if (this.isEmpty()) { + this.tail.setPrevNode(node); + this.head.setNextNode(node); + + node.setNextNode(this.tail); + node.setPrevNode(this.head); + } else { + const firstNode = this.head.getNext(); + if (!firstNode) { + throw new TypeError("firstNode must be a LinkedListNode"); + } + + node.setPrevNode(this.head); + node.setNextNode(firstNode); + firstNode.setPrevNode(node); + this.head.setNextNode(node); + } + + return node; + } + + insertLast(value: T): LinkedListNode { + if (this.isEmpty()) { + return this.insertFirst(value); + } + + const node = new LinkedListNode(value); + const lastNode = this.tail.getPrev(); + + if (!lastNode) { + throw new TypeError("lastNode must be a LinkedListNode"); + } + + node.setPrevNode(lastNode); + node.setNextNode(this.tail); + lastNode.setNextNode(node); + this.tail.setPrevNode(node); + + return node; + } + + isEmpty() { + return this.head.getNext() === this.tail; + } + + getHead() { + return this.head; + } + + getTail() { + return this.tail; + } + + getLastNode() { + return this.tail.getPrev(); + } + + hasNext(node?: LinkedListNode | null): node is LinkedListNode { + if (node == null) { + return false; + } + + return node.getNext() !== this.tail; + } + + toArray() { + const arr = []; + let cur = this.head; + while (cur.getNext() !== this.tail) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + cur = cur.getNext()!; + arr.push(cur.getValue()); + } + + return arr; + } +} diff --git a/src/dataStructure/linkedList/linkedListNode.ts b/src/dataStructure/linkedList/linkedListNode.ts new file mode 100644 index 00000000..516cca4a --- /dev/null +++ b/src/dataStructure/linkedList/linkedListNode.ts @@ -0,0 +1,39 @@ +export class LinkedListNode { + private value: T; + private next: LinkedListNode | null; + private prev: LinkedListNode | null; + + constructor(value: T) { + this.value = value; + this.next = null; + this.prev = null; + } + + setNextNode(node: LinkedListNode) { + this.next = node; + + return node; + } + + setPrevNode(node: LinkedListNode) { + this.prev = node; + + return node; + } + + getValue() { + return this.value; + } + + getNext() { + return this.next; + } + + getPrev() { + return this.prev; + } + + hasNext() { + return this.next instanceof LinkedListNode; + } +} diff --git a/test/Lazy/fork.spec.ts b/test/Lazy/fork.spec.ts new file mode 100644 index 00000000..92420e84 --- /dev/null +++ b/test/Lazy/fork.spec.ts @@ -0,0 +1,294 @@ +import { + concurrent, + delay, + fork, + map, + pipe, + range, + toArray, + toAsync, +} from "../../src/index"; + +describe("fork", function () { + describe("sync", function () { + it("should be forked iterable(number)", function () { + const arr = [1, 2, 3]; + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(iter1.next()).toEqual({ value: 1, done: false }); + expect(iter1.next()).toEqual({ value: 2, done: false }); + expect(iter1.next()).toEqual({ value: 3, done: false }); + expect(iter1.next()).toEqual({ value: undefined, done: true }); + + expect(iter2.next()).toEqual({ value: 1, done: false }); + expect(iter2.next()).toEqual({ value: 2, done: false }); + expect(iter2.next()).toEqual({ value: 3, done: false }); + expect(iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be forked iterable(string)", function () { + const arr = "abc"; + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(iter1.next()).toEqual({ value: "a", done: false }); + expect(iter1.next()).toEqual({ value: "b", done: false }); + expect(iter1.next()).toEqual({ value: "c", done: false }); + expect(iter1.next()).toEqual({ value: undefined, done: true }); + + expect(iter2.next()).toEqual({ value: "a", done: false }); + expect(iter2.next()).toEqual({ value: "b", done: false }); + expect(iter2.next()).toEqual({ value: "c", done: false }); + expect(iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be able to be used as a forked function in the pipeline", function () { + const arr = pipe( + [1, 2, 3], + map((a) => a + 10), + ); + + const iter1 = fork(arr); + const iter2 = fork(arr); + const iter3 = pipe( + fork(iter1), + map((a) => String(a)), + ); + + expect(arr.next()).toEqual({ value: 11, done: false }); + expect(arr.next()).toEqual({ value: 12, done: false }); + expect(arr.next()).toEqual({ value: 13, done: false }); + expect(arr.next()).toEqual({ value: undefined, done: true }); + + expect(iter1.next()).toEqual({ value: 11, done: false }); + expect(iter2.next()).toEqual({ value: 11, done: false }); + expect(iter3.next()).toEqual({ value: "11", done: false }); + + expect(iter1.next()).toEqual({ value: 12, done: false }); + expect(iter1.next()).toEqual({ value: 13, done: false }); + expect(iter1.next()).toEqual({ value: undefined, done: true }); + + expect(iter2.next()).toEqual({ value: 12, done: false }); + expect(iter2.next()).toEqual({ value: 13, done: false }); + expect(iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("forked iterator proceeds independently even if there is no data to process from the original.", function () { + const arr = pipe( + [1, 2, 3], + map((a) => a + 10), + ); + + const iter1 = fork(arr); + expect(arr.next()).toEqual({ value: 11, done: false }); + expect(arr.next()).toEqual({ value: 12, done: false }); + expect(arr.next()).toEqual({ value: 13, done: false }); + expect(arr.next()).toEqual({ value: undefined, done: true }); + + expect(iter1.next()).toEqual({ value: 11, done: false }); + expect(iter1.next()).toEqual({ value: 12, done: false }); + expect(iter1.next()).toEqual({ value: 13, done: false }); + expect(iter1.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be forked in the middle of iterable progress", function () { + const arr = pipe( + [1, 2, 3], + map((a) => a + 10), + ); + + const iter1 = fork(arr); + const iter2 = fork(iter1); + + expect(arr.next()).toEqual({ value: 11, done: false }); + expect(iter1.next()).toEqual({ value: 11, done: false }); + expect(iter2.next()).toEqual({ value: 11, done: false }); + + const iter3 = fork(arr); + const iter4 = fork(iter1); + expect(arr.next()).toEqual({ value: 12, done: false }); + expect(iter1.next()).toEqual({ value: 12, done: false }); + expect(iter1.next()).toEqual({ value: 13, done: false }); + expect(iter1.next()).toEqual({ value: undefined, done: true }); + expect(iter2.next()).toEqual({ value: 12, done: false }); + expect(iter3.next()).toEqual({ value: 12, done: false }); + expect(iter4.next()).toEqual({ value: 12, done: false }); + }); + }); + + describe("async", function () { + it("should be forked iterable(number)", async function () { + const arr = toAsync([1, 2, 3]); + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(await arr.next()).toEqual({ value: 1, done: false }); + expect(await iter1.next()).toEqual({ value: 1, done: false }); + expect(await iter1.next()).toEqual({ value: 2, done: false }); + expect(await iter1.next()).toEqual({ value: 3, done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + + expect(await iter2.next()).toEqual({ value: 1, done: false }); + expect(await iter2.next()).toEqual({ value: 2, done: false }); + expect(await iter2.next()).toEqual({ value: 3, done: false }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be forked iterable(string)", async function () { + const arr = toAsync("abc"); + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(await iter1.next()).toEqual({ value: "a", done: false }); + expect(await iter1.next()).toEqual({ value: "b", done: false }); + expect(await iter1.next()).toEqual({ value: "c", done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + + expect(await iter2.next()).toEqual({ value: "a", done: false }); + expect(await iter2.next()).toEqual({ value: "b", done: false }); + expect(await iter2.next()).toEqual({ value: "c", done: false }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be able to be used as a forked function in the pipeline", async function () { + const arr = pipe( + toAsync([1, 2, 3]), + map((a) => a + 10), + ); + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(await arr.next()).toEqual({ value: 11, done: false }); + expect(await arr.next()).toEqual({ value: 12, done: false }); + expect(await arr.next()).toEqual({ value: 13, done: false }); + expect(await arr.next()).toEqual({ value: undefined, done: true }); + + expect(await iter1.next()).toEqual({ value: 11, done: false }); + expect(await iter2.next()).toEqual({ value: 11, done: false }); + + expect(await iter1.next()).toEqual({ value: 12, done: false }); + expect(await iter1.next()).toEqual({ value: 13, done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + + expect(await iter2.next()).toEqual({ value: 12, done: false }); + expect(await iter2.next()).toEqual({ value: 13, done: false }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("forked iterator proceeds independently even if there is no data to process from the original", async function () { + const arr = pipe( + toAsync([1, 2, 3]), + map((a) => a + 10), + ); + + const iter1 = fork(arr); + expect(await arr.next()).toEqual({ value: 11, done: false }); + expect(await arr.next()).toEqual({ value: 12, done: false }); + expect(await arr.next()).toEqual({ value: 13, done: false }); + expect(await arr.next()).toEqual({ value: undefined, done: true }); + + expect(await iter1.next()).toEqual({ value: 11, done: false }); + expect(await iter1.next()).toEqual({ value: 12, done: false }); + expect(await iter1.next()).toEqual({ value: 13, done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be forked in the middle of iterable progress", async function () { + const arr = pipe( + toAsync([1, 2, 3]), + map((a) => a + 10), + ); + + const iter1 = fork(arr); + const iter2 = fork(iter1); + + expect(await arr.next()).toEqual({ value: 11, done: false }); + expect(await iter1.next()).toEqual({ value: 11, done: false }); + expect(await iter2.next()).toEqual({ value: 11, done: false }); + + const iter3 = fork(arr); + const iter4 = fork(iter1); + expect(await iter1.next()).toEqual({ value: 12, done: false }); + expect(await iter1.next()).toEqual({ value: 13, done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + expect(await iter2.next()).toEqual({ value: 12, done: false }); + expect(await iter3.next()).toEqual({ value: 12, done: false }); + expect(await iter4.next()).toEqual({ value: 12, done: false }); + }); + + it("forked iterable should be consumed concurrently", async function () { + const iter = pipe( + toAsync(range(10)), + map((a) => delay(500, a)), + ); + + const forkedIter = fork(iter); + const arr1 = await pipe(iter, concurrent(5), toArray); + expect(arr1).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + + const arr2 = await pipe(forkedIter, concurrent(5), toArray); + expect(arr2).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + }, 1050); + + it("forked iterable should be consumed concurrently (forked iterable first)", async function () { + const iter = pipe( + toAsync(range(10)), + map((a) => delay(500, a)), + ); + + const forkedIter = fork(iter); + const arr1 = await pipe(forkedIter, concurrent(5), toArray); + expect(arr1).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + + const arr2 = await pipe(iter, concurrent(5), toArray); + expect(arr2).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + }, 1050); + + it("forked iterable and origin iterable must each be consumable.", async function () { + const origin = pipe( + toAsync(range(6)), + map((a) => delay(500, a)), + ); + + const iter = pipe(origin, concurrent(3)); + const forkedIter = pipe(fork(origin), concurrent(3)); + + const p1 = await iter.next(); + const pf1 = await forkedIter.next(); + const p2 = await iter.next(); + const pf2 = await forkedIter.next(); + const p3 = await iter.next(); + const pf3 = await forkedIter.next(); + const p4 = await iter.next(); + const pf4 = await forkedIter.next(); + const p5 = await iter.next(); + const pf5 = await forkedIter.next(); + const p6 = await iter.next(); + const pf6 = await forkedIter.next(); + const p7 = await iter.next(); + const pf7 = await forkedIter.next(); + + expect(p1).toEqual({ value: 0, done: false }); + expect(p2).toEqual({ value: 1, done: false }); + expect(p3).toEqual({ value: 2, done: false }); + expect(p4).toEqual({ value: 3, done: false }); + expect(p5).toEqual({ value: 4, done: false }); + expect(p6).toEqual({ value: 5, done: false }); + expect(p7).toEqual({ value: undefined, done: true }); + + expect(pf1).toEqual({ value: 0, done: false }); + expect(pf2).toEqual({ value: 1, done: false }); + expect(pf3).toEqual({ value: 2, done: false }); + expect(pf4).toEqual({ value: 3, done: false }); + expect(pf5).toEqual({ value: 4, done: false }); + expect(pf6).toEqual({ value: 5, done: false }); + expect(pf7).toEqual({ value: undefined, done: true }); + }, 1050); + }); +}); diff --git a/type-check/Lazy/fork.test.ts b/type-check/Lazy/fork.test.ts new file mode 100644 index 00000000..9b114c61 --- /dev/null +++ b/type-check/Lazy/fork.test.ts @@ -0,0 +1,17 @@ +import { fork, pipe } from "../../src"; +import * as Test from "../../src/types/Test"; + +const { checks, check } = Test; + +const res1 = fork([1, 2, 3]); +const res2 = fork("abc"); + +const res3 = pipe([1, 2, 3], fork); +const res4 = pipe("abc", fork); + +checks([ + check, Test.Pass>(), + check, Test.Pass>(), + check, Test.Pass>(), + check, Test.Pass>(), +]);