From 9f96fc06c5d46e2390a7b88d4301651c96a5273f Mon Sep 17 00:00:00 2001 From: hw Date: Mon, 12 Feb 2024 16:21:59 +0900 Subject: [PATCH 1/5] feat: add fork (iterable) --- src/Lazy/fork.ts | 102 +++++++++++++++++++++++++++++++++++ src/Lazy/index.ts | 2 + test/Lazy/fork.spec.ts | 79 +++++++++++++++++++++++++++ type-check/Lazy/fork.test.ts | 17 ++++++ 4 files changed, 200 insertions(+) create mode 100644 src/Lazy/fork.ts create mode 100644 test/Lazy/fork.spec.ts create mode 100644 type-check/Lazy/fork.test.ts diff --git a/src/Lazy/fork.ts b/src/Lazy/fork.ts new file mode 100644 index 00000000..c2c895a3 --- /dev/null +++ b/src/Lazy/fork.ts @@ -0,0 +1,102 @@ +import { isAsyncIterable, isIterable } from "../_internal/utils"; +import isNil from "../isNil"; +import type IterableInfer from "../types/IterableInfer"; + +type ReturnForkType | AsyncIterable> = + A extends AsyncIterable + ? AsyncIterableIterator> + : IterableIterator>; + +type Node = { value: T; done?: boolean }; + +class ForkItem { + node: Node; + nextNode: ForkItem | null; + + constructor(node: Node) { + this.node = node; + this.nextNode = null; + } +} + +class ForkQueue { + head: ForkItem; + + current: ForkItem; + + constructor() { + this.head = new ForkItem(null as any); + this.current = this.head; + } + + toString() { + const arr = []; + let cur: ForkItem | null = this.head.nextNode; + while (cur) { + arr.push(cur.node.value); + cur = cur.nextNode; + } + + return arr.join(", "); + } +} + +const forkMap = new WeakMap>(); + +function sync(iterable: Iterable) { + const iterator = iterable[Symbol.iterator](); + let queue = forkMap.get(iterator) as ForkQueue; + if (!queue) { + queue = new ForkQueue(); + forkMap.set(iterator, queue); + } + + let cur = queue.current; + let done = false; + + return { + [Symbol.iterator]() { + return iterator; + }, + + next() { + if (done) { + return { + done, + value: undefined, + }; + } + + const item = cur.nextNode; + if (isNil(item)) { + const node = iterator.next(); + cur.nextNode = new ForkItem(node); + cur = cur.nextNode; + + queue.current = cur; + done = node.done ?? true; + + return cur.node; + } + + cur = item; + return cur.node; + }, + }; +} + +function fork | AsyncIterable>( + iterable: A, +): ReturnForkType { + if (isIterable(iterable)) { + return sync(iterable) as ReturnForkType; + } + + if (isAsyncIterable(iterable)) { + throw new TypeError("'fork' asyncIterable isn't supported not yet"); + } + + throw new TypeError("'iterable' must be type of Iterable or AsyncIterable"); +} + +export default fork; diff --git a/src/Lazy/index.ts b/src/Lazy/index.ts index 222cb7f0..1481003e 100644 --- a/src/Lazy/index.ts +++ b/src/Lazy/index.ts @@ -15,6 +15,7 @@ import entries from "./entries"; import filter from "./filter"; import flat from "./flat"; import flatMap from "./flatMap"; +import fork from "./fork"; import intersection from "./intersection"; import intersectionBy from "./intersectionBy"; import keys from "./keys"; @@ -60,6 +61,7 @@ export { filter, flat, flatMap, + fork, intersection, intersectionBy, keys, diff --git a/test/Lazy/fork.spec.ts b/test/Lazy/fork.spec.ts new file mode 100644 index 00000000..0a7c2ae0 --- /dev/null +++ b/test/Lazy/fork.spec.ts @@ -0,0 +1,79 @@ +import { fork, map, pipe } from "../../src/index"; + +describe("fork", function () { + describe("sync", function () { + it("should be forked iterable(number)", function () { + const arr = [1, 2, 3]; + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(iter1.next()).toEqual({ value: 1, done: false }); + expect(iter1.next()).toEqual({ value: 2, done: false }); + expect(iter1.next()).toEqual({ value: 3, done: false }); + expect(iter1.next()).toEqual({ value: undefined, done: true }); + + expect(iter2.next()).toEqual({ value: 1, done: false }); + expect(iter2.next()).toEqual({ value: 2, done: false }); + expect(iter2.next()).toEqual({ value: 3, done: false }); + expect(iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be forked iterable(string)", function () { + const arr = "abc"; + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(iter1.next()).toEqual({ value: "a", done: false }); + expect(iter1.next()).toEqual({ value: "b", done: false }); + expect(iter1.next()).toEqual({ value: "c", done: false }); + expect(iter1.next()).toEqual({ value: undefined, done: true }); + + expect(iter2.next()).toEqual({ value: "a", done: false }); + expect(iter2.next()).toEqual({ value: "b", done: false }); + expect(iter2.next()).toEqual({ value: "c", done: false }); + expect(iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be able to be used as a forked function in the pipeline", function () { + const arr = pipe( + [1, 2, 3], + map((a) => a + 10), + ); + + const iter1 = fork(arr); + const iter2 = fork(arr); + expect(iter1.next()).toEqual({ value: 11, done: false }); + expect(iter2.next()).toEqual({ value: 11, done: false }); + + expect(iter1.next()).toEqual({ value: 12, done: false }); + expect(iter1.next()).toEqual({ value: 13, done: false }); + expect(iter1.next()).toEqual({ value: undefined, done: true }); + + expect(iter2.next()).toEqual({ value: 12, done: false }); + expect(iter2.next()).toEqual({ value: 13, done: false }); + expect(iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be forked in the middle of iterable progress", function () { + const arr = pipe( + [1, 2, 3], + map((a) => a + 10), + ); + + const iter1 = fork(arr); + const iter2 = fork(iter1); + + expect(iter1.next()).toEqual({ value: 11, done: false }); + expect(iter2.next()).toEqual({ value: 11, done: false }); + + const iter3 = fork(iter1); + expect(iter1.next()).toEqual({ value: 12, done: false }); + expect(iter1.next()).toEqual({ value: 13, done: false }); + expect(iter1.next()).toEqual({ value: undefined, done: true }); + expect(iter2.next()).toEqual({ value: 12, done: false }); + expect(iter3.next()).toEqual({ value: 12, done: false }); + }); + }); +}); diff --git a/type-check/Lazy/fork.test.ts b/type-check/Lazy/fork.test.ts new file mode 100644 index 00000000..9b114c61 --- /dev/null +++ b/type-check/Lazy/fork.test.ts @@ -0,0 +1,17 @@ +import { fork, pipe } from "../../src"; +import * as Test from "../../src/types/Test"; + +const { checks, check } = Test; + +const res1 = fork([1, 2, 3]); +const res2 = fork("abc"); + +const res3 = pipe([1, 2, 3], fork); +const res4 = pipe("abc", fork); + +checks([ + check, Test.Pass>(), + check, Test.Pass>(), + check, Test.Pass>(), + check, Test.Pass>(), +]); From 269c84c129b8de855b5720a1d43e8d4697815dce Mon Sep 17 00:00:00 2001 From: hw Date: Thu, 7 Mar 2024 14:50:34 +0900 Subject: [PATCH 2/5] feat: add linked list --- src/Lazy/fork.ts | 60 +++++-------------- src/dataStructure/linkedList/linkedList.ts | 59 ++++++++++++++++++ .../linkedList/linkedListNode.ts | 27 +++++++++ 3 files changed, 101 insertions(+), 45 deletions(-) create mode 100644 src/dataStructure/linkedList/linkedList.ts create mode 100644 src/dataStructure/linkedList/linkedListNode.ts diff --git a/src/Lazy/fork.ts b/src/Lazy/fork.ts index c2c895a3..559f69f0 100644 --- a/src/Lazy/fork.ts +++ b/src/Lazy/fork.ts @@ -1,4 +1,6 @@ import { isAsyncIterable, isIterable } from "../_internal/utils"; +import { LinkedList } from "../dataStructure/linkedList/linkedList"; +import type { LinkedListNode } from "../dataStructure/linkedList/linkedListNode"; import isNil from "../isNil"; import type IterableInfer from "../types/IterableInfer"; @@ -7,51 +9,22 @@ type ReturnForkType | AsyncIterable> = ? AsyncIterableIterator> : IterableIterator>; -type Node = { value: T; done?: boolean }; +type Value = any; -class ForkItem { - node: Node; - nextNode: ForkItem | null; - - constructor(node: Node) { - this.node = node; - this.nextNode = null; - } -} - -class ForkQueue { - head: ForkItem; - - current: ForkItem; - - constructor() { - this.head = new ForkItem(null as any); - this.current = this.head; - } - - toString() { - const arr = []; - let cur: ForkItem | null = this.head.nextNode; - while (cur) { - arr.push(cur.node.value); - cur = cur.nextNode; - } - - return arr.join(", "); - } -} - -const forkMap = new WeakMap>(); +const forkMap = new WeakMap< + Iterator, + LinkedList> +>(); function sync(iterable: Iterable) { const iterator = iterable[Symbol.iterator](); - let queue = forkMap.get(iterator) as ForkQueue; + let queue = forkMap.get(iterator) as LinkedList>; if (!queue) { - queue = new ForkQueue(); + queue = new LinkedList(); forkMap.set(iterator, queue); } - let cur = queue.current; + let current: LinkedListNode> | null = queue.getTail(); let done = false; return { @@ -67,20 +40,17 @@ function sync(iterable: Iterable) { }; } - const item = cur.nextNode; + const item = current?.getNext(); if (isNil(item)) { const node = iterator.next(); - cur.nextNode = new ForkItem(node); - cur = cur.nextNode; - - queue.current = cur; + current = queue.insertLast(node); done = node.done ?? true; - return cur.node; + return node; } - cur = item; - return cur.node; + current = item; + return current.getValue(); }, }; } diff --git a/src/dataStructure/linkedList/linkedList.ts b/src/dataStructure/linkedList/linkedList.ts new file mode 100644 index 00000000..e371ff6c --- /dev/null +++ b/src/dataStructure/linkedList/linkedList.ts @@ -0,0 +1,59 @@ +import { LinkedListNode } from "./linkedListNode"; + +export class LinkedList { + private head: LinkedListNode; + private tail: LinkedListNode; + + constructor() { + this.head = new LinkedListNode(null as unknown as T); + this.tail = this.head; + } + + insertFirst(value: T): LinkedListNode { + const node = new LinkedListNode(value); + if (this.isEmpty()) { + this.tail = node; + this.head.setNextNode(node); + } else { + node.setNextNode(this.head.getNext()); + this.head.setNextNode(node); + } + + return node; + } + + insertLast(value: T): LinkedListNode { + if (this.isEmpty()) { + return this.insertFirst(value); + } + + const node = new LinkedListNode(value); + this.tail?.setNextNode(node); + this.tail = node; + return node; + } + + isEmpty() { + return !this.head.hasNext(); + } + + getHead() { + return this.head; + } + + getTail() { + return this.tail; + } + + toArray() { + const arr = []; + let cur = this.head; + while (cur.hasNext()) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + cur = cur.getNext()!; + arr.push(cur); + } + + return arr; + } +} diff --git a/src/dataStructure/linkedList/linkedListNode.ts b/src/dataStructure/linkedList/linkedListNode.ts new file mode 100644 index 00000000..f479aa27 --- /dev/null +++ b/src/dataStructure/linkedList/linkedListNode.ts @@ -0,0 +1,27 @@ +export class LinkedListNode { + private value: T; + private next: LinkedListNode | null; + + constructor(value: T) { + this.value = value; + this.next = null; + } + + setNextNode(node: LinkedListNode | null) { + this.next = node; + + return node; + } + + getValue() { + return this.value; + } + + getNext() { + return this.next; + } + + hasNext() { + return this.next instanceof LinkedListNode; + } +} From a40f6a7135079fa678f925a18c40a4256d18ad08 Mon Sep 17 00:00:00 2001 From: hw Date: Thu, 7 Mar 2024 14:50:51 +0900 Subject: [PATCH 3/5] docs: fork --- src/Lazy/fork.ts | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/Lazy/fork.ts b/src/Lazy/fork.ts index 559f69f0..be81d504 100644 --- a/src/Lazy/fork.ts +++ b/src/Lazy/fork.ts @@ -55,6 +55,48 @@ function sync(iterable: Iterable) { }; } +/** + * Returns an iterable of forks of original source. Each fork contains the same values as source, and can be consumed independently. + * + * @example + * ```ts + * const arr = [1, 2, 3]; + * const iter1 = fork(arr); + * const iter2 = fork(arr); + * + * iter1.next() // {done:false, value: 1} + * iter1.next() // {done:false, value: 2} + * iter2.next() // {done:false, value: 1} + * iter2.next() // {done:false, value: 2} + * + * const str = 'abc' + * const strIter1 = fork(str); + * const strIter2 = fork(str); + * + * strIter1.next() // {done:false, value: 'a'} + * strIter1.next() // {done:false, value: 'b'} + * strIter2.next() // {done:false, value: 'a'} + * strIter2.next() // {done:false, value: 'b'} + * + * // with pipe + * const arrAdd10 = pipe( + * [1, 2, 3], + * map((a) => a + 10), + * ); + * + * const arrAdd10Iter1 = fork(arrAdd10); + * const arrAdd10Iter2 = fork(arrAdd10); + * arrAdd10Iter1.next() // { value: 11, done: false } + * arrAdd10Iter2.next() // { value: 11, done: false } + * + * const arrAdd10Iter3 = fork(arrAdd10Iter1); + * arrAdd10Iter1.next() // { value: 12, done: false } + * arrAdd10Iter1.next() // { value: 13, done: false } + * arrAdd10Iter2.next() // { value: 12, done: false } + * arrAdd10Iter3.next() // { value: 12, done: false } + * ``` + */ +// prettier-ignore function fork | AsyncIterable>( iterable: A, ): ReturnForkType { From ac4b92781f0ff16789a9d034562d9aae5859a272 Mon Sep 17 00:00:00 2001 From: hw Date: Sun, 10 Mar 2024 17:22:00 +0900 Subject: [PATCH 4/5] feat: async fork --- src/Lazy/fork.ts | 167 +++++++++++++++--- src/dataStructure/linkedList/linkedList.ts | 40 ++++- .../linkedList/linkedListNode.ts | 14 +- test/Lazy/fork.spec.ts | 141 ++++++++++++++- 4 files changed, 325 insertions(+), 37 deletions(-) diff --git a/src/Lazy/fork.ts b/src/Lazy/fork.ts index be81d504..609a3e1d 100644 --- a/src/Lazy/fork.ts +++ b/src/Lazy/fork.ts @@ -11,47 +11,160 @@ type ReturnForkType | AsyncIterable> = type Value = any; -const forkMap = new WeakMap< - Iterator, - LinkedList> ->(); +type ForkItem = { + queue: LinkedList>; + originNext: () => IteratorResult; +}; + +const forkMap = new WeakMap, ForkItem>(); function sync(iterable: Iterable) { const iterator = iterable[Symbol.iterator](); - let queue = forkMap.get(iterator) as LinkedList>; - if (!queue) { - queue = new LinkedList(); - forkMap.set(iterator, queue); - } - let current: LinkedListNode> | null = queue.getTail(); - let done = false; + const getNext = (forkItem: ForkItem) => { + let current: LinkedListNode> | null = + forkItem.queue.getLastNode(); - return { - [Symbol.iterator]() { - return iterator; - }, + const done = () => { + iterator.next = forkItem.originNext; + + return { + done: true, + value: undefined, + } as const; + }; - next() { - if (done) { - return { - done, - value: undefined, - }; + let isDone = false; + const next = () => { + if (isDone) { + return done(); } const item = current?.getNext(); - if (isNil(item)) { - const node = iterator.next(); - current = queue.insertLast(node); - done = node.done ?? true; + + if (isNil(item) || item === forkItem.queue.getTail()) { + const node = forkItem.originNext(); + + current = forkItem.queue.insertLast(node); + isDone = node.done ?? true; + if (isDone) { + return done(); + } return node; } current = item; return current.getValue(); + }; + + return next; + }; + + let forkItem = forkMap.get(iterator) as ForkItem; + + if (!forkItem) { + const originNext = iterator.next.bind(iterator); + forkItem = { + queue: new LinkedList(), + originNext: originNext, + }; + + iterator.next = getNext(forkItem); + forkMap.set(iterator, forkItem); + } + + const next = getNext(forkItem); + + return { + [Symbol.iterator]() { + return this; + }, + + next: next, + }; +} + +type ForkAsyncItem = { + queue: LinkedList>; + next: (...args: any) => Promise>; +}; + +const forkAsyncMap = new WeakMap, ForkAsyncItem>(); + +function async(iterable: AsyncIterable) { + const iterator = iterable[Symbol.asyncIterator](); + + const getNext = (forkItem: ForkAsyncItem) => { + let current: Promise> | null> = + Promise.resolve(forkItem.queue.getLastNode()); + + const done = () => { + iterator.next = forkItem.next; + + return { + done: true, + value: undefined, + } as const; + }; + + let isDone = false; + const next = async (_concurrent: any) => { + if (isDone) { + return done(); + } + + const itemCurrent = await current; + const item = itemCurrent?.getNext(); + + return new Promise((resolve, reject) => { + if (isNil(item) || item === forkItem.queue.getTail()) { + return forkItem + .next(_concurrent) + .then((node) => { + current = current.then(() => { + return forkItem.queue.insertLast(node); + }); + + isDone = node.done ?? true; + if (isDone) { + return resolve(done()); + } + + return resolve(node); + }) + .catch(reject); + } + + current = current.then(() => { + return item; + }); + + resolve(item.getValue()); + }); + }; + + return next; + }; + + let forkItem = forkAsyncMap.get(iterator) as ForkAsyncItem; + if (!forkItem) { + const originNext = iterator.next.bind(iterator); + forkItem = { + queue: new LinkedList(), + next: originNext, + }; + + iterator.next = getNext(forkItem) as any; + forkAsyncMap.set(iterator, forkItem); + } + + const next = getNext(forkItem); + return { + [Symbol.asyncIterator]() { + return this; }, + next: next, }; } @@ -101,11 +214,11 @@ function fork | AsyncIterable>( iterable: A, ): ReturnForkType { if (isIterable(iterable)) { - return sync(iterable) as ReturnForkType; + return sync(iterable) as any; } if (isAsyncIterable(iterable)) { - throw new TypeError("'fork' asyncIterable isn't supported not yet"); + return async(iterable) as any; } throw new TypeError("'iterable' must be type of Iterable or AsyncIterable"); diff --git a/src/dataStructure/linkedList/linkedList.ts b/src/dataStructure/linkedList/linkedList.ts index e371ff6c..59e96264 100644 --- a/src/dataStructure/linkedList/linkedList.ts +++ b/src/dataStructure/linkedList/linkedList.ts @@ -6,16 +6,29 @@ export class LinkedList { constructor() { this.head = new LinkedListNode(null as unknown as T); - this.tail = this.head; + this.tail = new LinkedListNode(null as unknown as T); + + this.head.setNextNode(this.tail); + this.tail.setPrevNode(this.head); } insertFirst(value: T): LinkedListNode { const node = new LinkedListNode(value); if (this.isEmpty()) { - this.tail = node; + this.tail.setPrevNode(node); this.head.setNextNode(node); + + node.setNextNode(this.tail); + node.setPrevNode(this.head); } else { - node.setNextNode(this.head.getNext()); + const firstNode = this.head.getNext(); + if (!firstNode) { + throw new TypeError("firstNode must be a LinkedListNode"); + } + + node.setPrevNode(this.head); + node.setNextNode(firstNode); + firstNode.setPrevNode(node); this.head.setNextNode(node); } @@ -28,8 +41,17 @@ export class LinkedList { } const node = new LinkedListNode(value); - this.tail?.setNextNode(node); - this.tail = node; + const lastNode = this.tail.getPrev(); + + if (!lastNode) { + throw new TypeError("lastNode must be a LinkedListNode"); + } + + node.setPrevNode(lastNode); + node.setNextNode(this.tail); + lastNode.setNextNode(node); + this.tail.setPrevNode(node); + return node; } @@ -45,13 +67,17 @@ export class LinkedList { return this.tail; } + getLastNode() { + return this.tail.getPrev(); + } + toArray() { const arr = []; let cur = this.head; - while (cur.hasNext()) { + while (cur.getNext() !== this.tail) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion cur = cur.getNext()!; - arr.push(cur); + arr.push(cur.getValue()); } return arr; diff --git a/src/dataStructure/linkedList/linkedListNode.ts b/src/dataStructure/linkedList/linkedListNode.ts index f479aa27..516cca4a 100644 --- a/src/dataStructure/linkedList/linkedListNode.ts +++ b/src/dataStructure/linkedList/linkedListNode.ts @@ -1,18 +1,26 @@ export class LinkedListNode { private value: T; private next: LinkedListNode | null; + private prev: LinkedListNode | null; constructor(value: T) { this.value = value; this.next = null; + this.prev = null; } - setNextNode(node: LinkedListNode | null) { + setNextNode(node: LinkedListNode) { this.next = node; return node; } + setPrevNode(node: LinkedListNode) { + this.prev = node; + + return node; + } + getValue() { return this.value; } @@ -21,6 +29,10 @@ export class LinkedListNode { return this.next; } + getPrev() { + return this.prev; + } + hasNext() { return this.next instanceof LinkedListNode; } diff --git a/test/Lazy/fork.spec.ts b/test/Lazy/fork.spec.ts index 0a7c2ae0..1060a327 100644 --- a/test/Lazy/fork.spec.ts +++ b/test/Lazy/fork.spec.ts @@ -1,4 +1,4 @@ -import { fork, map, pipe } from "../../src/index"; +import { fork, map, pipe, toAsync } from "../../src/index"; describe("fork", function () { describe("sync", function () { @@ -44,8 +44,19 @@ describe("fork", function () { const iter1 = fork(arr); const iter2 = fork(arr); + const iter3 = pipe( + fork(iter1), + map((a) => String(a)), + ); + + expect(arr.next()).toEqual({ value: 11, done: false }); + expect(arr.next()).toEqual({ value: 12, done: false }); + expect(arr.next()).toEqual({ value: 13, done: false }); + expect(arr.next()).toEqual({ value: undefined, done: true }); + expect(iter1.next()).toEqual({ value: 11, done: false }); expect(iter2.next()).toEqual({ value: 11, done: false }); + expect(iter3.next()).toEqual({ value: "11", done: false }); expect(iter1.next()).toEqual({ value: 12, done: false }); expect(iter1.next()).toEqual({ value: 13, done: false }); @@ -56,6 +67,24 @@ describe("fork", function () { expect(iter2.next()).toEqual({ value: undefined, done: true }); }); + it("forked iterator proceeds independently even if there is no data to process from the original.", function () { + const arr = pipe( + [1, 2, 3], + map((a) => a + 10), + ); + + const iter1 = fork(arr); + expect(arr.next()).toEqual({ value: 11, done: false }); + expect(arr.next()).toEqual({ value: 12, done: false }); + expect(arr.next()).toEqual({ value: 13, done: false }); + expect(arr.next()).toEqual({ value: undefined, done: true }); + + expect(iter1.next()).toEqual({ value: 11, done: false }); + expect(iter1.next()).toEqual({ value: 12, done: false }); + expect(iter1.next()).toEqual({ value: 13, done: false }); + expect(iter1.next()).toEqual({ value: undefined, done: true }); + }); + it("should be forked in the middle of iterable progress", function () { const arr = pipe( [1, 2, 3], @@ -65,15 +94,123 @@ describe("fork", function () { const iter1 = fork(arr); const iter2 = fork(iter1); + expect(arr.next()).toEqual({ value: 11, done: false }); expect(iter1.next()).toEqual({ value: 11, done: false }); expect(iter2.next()).toEqual({ value: 11, done: false }); - const iter3 = fork(iter1); + const iter3 = fork(arr); + const iter4 = fork(iter1); + expect(arr.next()).toEqual({ value: 12, done: false }); expect(iter1.next()).toEqual({ value: 12, done: false }); expect(iter1.next()).toEqual({ value: 13, done: false }); expect(iter1.next()).toEqual({ value: undefined, done: true }); expect(iter2.next()).toEqual({ value: 12, done: false }); expect(iter3.next()).toEqual({ value: 12, done: false }); + expect(iter4.next()).toEqual({ value: 12, done: false }); + }); + }); + + describe("async", function () { + it("should be forked iterable(number)", async function () { + const arr = toAsync([1, 2, 3]); + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(await arr.next()).toEqual({ value: 1, done: false }); + expect(await iter1.next()).toEqual({ value: 1, done: false }); + expect(await iter1.next()).toEqual({ value: 2, done: false }); + expect(await iter1.next()).toEqual({ value: 3, done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + + expect(await iter2.next()).toEqual({ value: 1, done: false }); + expect(await iter2.next()).toEqual({ value: 2, done: false }); + expect(await iter2.next()).toEqual({ value: 3, done: false }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be forked iterable(string)", async function () { + const arr = toAsync("abc"); + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(await iter1.next()).toEqual({ value: "a", done: false }); + expect(await iter1.next()).toEqual({ value: "b", done: false }); + expect(await iter1.next()).toEqual({ value: "c", done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + + expect(await iter2.next()).toEqual({ value: "a", done: false }); + expect(await iter2.next()).toEqual({ value: "b", done: false }); + expect(await iter2.next()).toEqual({ value: "c", done: false }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be able to be used as a forked function in the pipeline", async function () { + const arr = pipe( + toAsync([1, 2, 3]), + map((a) => a + 10), + ); + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(await arr.next()).toEqual({ value: 11, done: false }); + expect(await arr.next()).toEqual({ value: 12, done: false }); + expect(await arr.next()).toEqual({ value: 13, done: false }); + expect(await arr.next()).toEqual({ value: undefined, done: true }); + + expect(await iter1.next()).toEqual({ value: 11, done: false }); + expect(await iter2.next()).toEqual({ value: 11, done: false }); + + expect(await iter1.next()).toEqual({ value: 12, done: false }); + expect(await iter1.next()).toEqual({ value: 13, done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + + expect(await iter2.next()).toEqual({ value: 12, done: false }); + expect(await iter2.next()).toEqual({ value: 13, done: false }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("forked iterator proceeds independently even if there is no data to process from the original", async function () { + const arr = pipe( + toAsync([1, 2, 3]), + map((a) => a + 10), + ); + + const iter1 = fork(arr); + expect(await arr.next()).toEqual({ value: 11, done: false }); + expect(await arr.next()).toEqual({ value: 12, done: false }); + expect(await arr.next()).toEqual({ value: 13, done: false }); + expect(await arr.next()).toEqual({ value: undefined, done: true }); + + expect(await iter1.next()).toEqual({ value: 11, done: false }); + expect(await iter1.next()).toEqual({ value: 12, done: false }); + expect(await iter1.next()).toEqual({ value: 13, done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be forked in the middle of iterable progress", async function () { + const arr = pipe( + toAsync([1, 2, 3]), + map((a) => a + 10), + ); + + const iter1 = fork(arr); + const iter2 = fork(iter1); + + expect(await arr.next()).toEqual({ value: 11, done: false }); + expect(await iter1.next()).toEqual({ value: 11, done: false }); + expect(await iter2.next()).toEqual({ value: 11, done: false }); + + const iter3 = fork(arr); + const iter4 = fork(iter1); + expect(await iter1.next()).toEqual({ value: 12, done: false }); + expect(await iter1.next()).toEqual({ value: 13, done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + expect(await iter2.next()).toEqual({ value: 12, done: false }); + expect(await iter3.next()).toEqual({ value: 12, done: false }); + expect(await iter4.next()).toEqual({ value: 12, done: false }); }); }); }); From b34efe6f715d8c645e2861120a67ee5f15205937 Mon Sep 17 00:00:00 2001 From: hw Date: Mon, 11 Mar 2024 23:41:11 +0900 Subject: [PATCH 5/5] feat: concurrent fork --- src/Lazy/fork.ts | 116 +++++++++++++-------- src/dataStructure/linkedList/linkedList.ts | 10 +- test/Lazy/fork.spec.ts | 80 +++++++++++++- 3 files changed, 161 insertions(+), 45 deletions(-) diff --git a/src/Lazy/fork.ts b/src/Lazy/fork.ts index 609a3e1d..3a0430f8 100644 --- a/src/Lazy/fork.ts +++ b/src/Lazy/fork.ts @@ -3,6 +3,8 @@ import { LinkedList } from "../dataStructure/linkedList/linkedList"; import type { LinkedListNode } from "../dataStructure/linkedList/linkedListNode"; import isNil from "../isNil"; import type IterableInfer from "../types/IterableInfer"; +import type { Reject, Resolve } from "../types/Utils"; +import type { Concurrent } from "./concurrent"; type ReturnForkType | AsyncIterable> = A extends AsyncIterable @@ -13,7 +15,7 @@ type Value = any; type ForkItem = { queue: LinkedList>; - originNext: () => IteratorResult; + next: () => IteratorResult; }; const forkMap = new WeakMap, ForkItem>(); @@ -26,7 +28,7 @@ function sync(iterable: Iterable) { forkItem.queue.getLastNode(); const done = () => { - iterator.next = forkItem.originNext; + iterator.next = forkItem.next; return { done: true, @@ -43,7 +45,7 @@ function sync(iterable: Iterable) { const item = current?.getNext(); if (isNil(item) || item === forkItem.queue.getTail()) { - const node = forkItem.originNext(); + const node = forkItem.next(); current = forkItem.queue.insertLast(node); isDone = node.done ?? true; @@ -67,7 +69,7 @@ function sync(iterable: Iterable) { const originNext = iterator.next.bind(iterator); forkItem = { queue: new LinkedList(), - originNext: originNext, + next: originNext, }; iterator.next = getNext(forkItem); @@ -88,6 +90,7 @@ function sync(iterable: Iterable) { type ForkAsyncItem = { queue: LinkedList>; next: (...args: any) => Promise>; + done: boolean; }; const forkAsyncMap = new WeakMap, ForkAsyncItem>(); @@ -96,51 +99,77 @@ function async(iterable: AsyncIterable) { const iterator = iterable[Symbol.asyncIterator](); const getNext = (forkItem: ForkAsyncItem) => { - let current: Promise> | null> = - Promise.resolve(forkItem.queue.getLastNode()); - - const done = () => { - iterator.next = forkItem.next; - - return { - done: true, - value: undefined, - } as const; + const settlementQueue: [Resolve, Reject][] = []; + let currentNode: LinkedListNode> | null = + forkItem.queue.getLastNode() ?? null; + let nextCallCount = 0; + let resolvedCount = 0; + let prevItem = Promise.resolve(); + + const fillBuffer = (concurrent: Concurrent) => { + const nextItem = forkItem.next(concurrent); + + prevItem = prevItem + .then(() => nextItem) + .then(({ done, value }) => { + if (done) { + while (settlementQueue.length > 0) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const [resolve] = settlementQueue.shift()!; + resolve({ done: true, value: undefined }); + } + + return void (forkItem.done = true); + } + + forkItem.queue.insertLast({ done: false, value }); + recur(concurrent); + }) + .catch((reason) => { + forkItem.done = true; + while (settlementQueue.length > 0) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const [, reject] = settlementQueue.shift()!; + reject(reason); + } + }); }; - let isDone = false; - const next = async (_concurrent: any) => { - if (isDone) { - return done(); + function consumeBuffer() { + while ( + forkItem.queue.hasNext(currentNode) && + nextCallCount > resolvedCount + ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const result = currentNode.getNext()!.getValue()!; + currentNode = currentNode.getNext(); + + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const [resolve] = settlementQueue.shift()!; + resolve(result); + resolvedCount++; + } + } + + function recur(concurrent: Concurrent) { + if (nextCallCount === resolvedCount) { + return; + } else if (forkItem.queue.hasNext(currentNode)) { + consumeBuffer(); + } else { + fillBuffer(concurrent); } + } - const itemCurrent = await current; - const item = itemCurrent?.getNext(); + const next = async (concurrent: Concurrent) => { + if (forkItem.done && !forkItem.queue.hasNext(currentNode)) { + return { done: true, value: undefined }; + } + nextCallCount++; return new Promise((resolve, reject) => { - if (isNil(item) || item === forkItem.queue.getTail()) { - return forkItem - .next(_concurrent) - .then((node) => { - current = current.then(() => { - return forkItem.queue.insertLast(node); - }); - - isDone = node.done ?? true; - if (isDone) { - return resolve(done()); - } - - return resolve(node); - }) - .catch(reject); - } - - current = current.then(() => { - return item; - }); - - resolve(item.getValue()); + settlementQueue.push([resolve, reject]); + recur(concurrent); }); }; @@ -153,6 +182,7 @@ function async(iterable: AsyncIterable) { forkItem = { queue: new LinkedList(), next: originNext, + done: false, }; iterator.next = getNext(forkItem) as any; diff --git a/src/dataStructure/linkedList/linkedList.ts b/src/dataStructure/linkedList/linkedList.ts index 59e96264..b7f7537e 100644 --- a/src/dataStructure/linkedList/linkedList.ts +++ b/src/dataStructure/linkedList/linkedList.ts @@ -56,7 +56,7 @@ export class LinkedList { } isEmpty() { - return !this.head.hasNext(); + return this.head.getNext() === this.tail; } getHead() { @@ -71,6 +71,14 @@ export class LinkedList { return this.tail.getPrev(); } + hasNext(node?: LinkedListNode | null): node is LinkedListNode { + if (node == null) { + return false; + } + + return node.getNext() !== this.tail; + } + toArray() { const arr = []; let cur = this.head; diff --git a/test/Lazy/fork.spec.ts b/test/Lazy/fork.spec.ts index 1060a327..92420e84 100644 --- a/test/Lazy/fork.spec.ts +++ b/test/Lazy/fork.spec.ts @@ -1,4 +1,13 @@ -import { fork, map, pipe, toAsync } from "../../src/index"; +import { + concurrent, + delay, + fork, + map, + pipe, + range, + toArray, + toAsync, +} from "../../src/index"; describe("fork", function () { describe("sync", function () { @@ -212,5 +221,74 @@ describe("fork", function () { expect(await iter3.next()).toEqual({ value: 12, done: false }); expect(await iter4.next()).toEqual({ value: 12, done: false }); }); + + it("forked iterable should be consumed concurrently", async function () { + const iter = pipe( + toAsync(range(10)), + map((a) => delay(500, a)), + ); + + const forkedIter = fork(iter); + const arr1 = await pipe(iter, concurrent(5), toArray); + expect(arr1).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + + const arr2 = await pipe(forkedIter, concurrent(5), toArray); + expect(arr2).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + }, 1050); + + it("forked iterable should be consumed concurrently (forked iterable first)", async function () { + const iter = pipe( + toAsync(range(10)), + map((a) => delay(500, a)), + ); + + const forkedIter = fork(iter); + const arr1 = await pipe(forkedIter, concurrent(5), toArray); + expect(arr1).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + + const arr2 = await pipe(iter, concurrent(5), toArray); + expect(arr2).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + }, 1050); + + it("forked iterable and origin iterable must each be consumable.", async function () { + const origin = pipe( + toAsync(range(6)), + map((a) => delay(500, a)), + ); + + const iter = pipe(origin, concurrent(3)); + const forkedIter = pipe(fork(origin), concurrent(3)); + + const p1 = await iter.next(); + const pf1 = await forkedIter.next(); + const p2 = await iter.next(); + const pf2 = await forkedIter.next(); + const p3 = await iter.next(); + const pf3 = await forkedIter.next(); + const p4 = await iter.next(); + const pf4 = await forkedIter.next(); + const p5 = await iter.next(); + const pf5 = await forkedIter.next(); + const p6 = await iter.next(); + const pf6 = await forkedIter.next(); + const p7 = await iter.next(); + const pf7 = await forkedIter.next(); + + expect(p1).toEqual({ value: 0, done: false }); + expect(p2).toEqual({ value: 1, done: false }); + expect(p3).toEqual({ value: 2, done: false }); + expect(p4).toEqual({ value: 3, done: false }); + expect(p5).toEqual({ value: 4, done: false }); + expect(p6).toEqual({ value: 5, done: false }); + expect(p7).toEqual({ value: undefined, done: true }); + + expect(pf1).toEqual({ value: 0, done: false }); + expect(pf2).toEqual({ value: 1, done: false }); + expect(pf3).toEqual({ value: 2, done: false }); + expect(pf4).toEqual({ value: 3, done: false }); + expect(pf5).toEqual({ value: 4, done: false }); + expect(pf6).toEqual({ value: 5, done: false }); + expect(pf7).toEqual({ value: undefined, done: true }); + }, 1050); }); });