Skip to content

Commit

Permalink
feat: concurrent fork
Browse files Browse the repository at this point in the history
  • Loading branch information
ppeeou committed Mar 11, 2024
1 parent ac4b927 commit d31d011
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 45 deletions.
116 changes: 73 additions & 43 deletions src/Lazy/fork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { LinkedList } from "../dataStructure/linkedList/linkedList";
import type { LinkedListNode } from "../dataStructure/linkedList/linkedListNode";
import isNil from "../isNil";
import type IterableInfer from "../types/IterableInfer";
import type { Reject, Resolve } from "../types/Utils";
import type { Concurrent } from "./concurrent";

type ReturnForkType<A extends Iterable<unknown> | AsyncIterable<unknown>> =
A extends AsyncIterable<any>
Expand All @@ -13,7 +15,7 @@ type Value = any;

type ForkItem = {
queue: LinkedList<IteratorResult<Value>>;
originNext: () => IteratorResult<Value, any>;
next: () => IteratorResult<Value, any>;
};

const forkMap = new WeakMap<Iterator<Value>, ForkItem>();
Expand All @@ -26,7 +28,7 @@ function sync<T>(iterable: Iterable<T>) {
forkItem.queue.getLastNode();

const done = () => {
iterator.next = forkItem.originNext;
iterator.next = forkItem.next;

return {
done: true,
Expand All @@ -43,7 +45,7 @@ function sync<T>(iterable: Iterable<T>) {
const item = current?.getNext();

if (isNil(item) || item === forkItem.queue.getTail()) {
const node = forkItem.originNext();
const node = forkItem.next();

current = forkItem.queue.insertLast(node);
isDone = node.done ?? true;
Expand All @@ -67,7 +69,7 @@ function sync<T>(iterable: Iterable<T>) {
const originNext = iterator.next.bind(iterator);
forkItem = {
queue: new LinkedList(),
originNext: originNext,
next: originNext,
};

iterator.next = getNext(forkItem);
Expand All @@ -88,6 +90,7 @@ function sync<T>(iterable: Iterable<T>) {
type ForkAsyncItem = {
queue: LinkedList<IteratorResult<Value>>;
next: (...args: any) => Promise<IteratorResult<Value, any>>;
done: boolean;
};

const forkAsyncMap = new WeakMap<AsyncIterator<Value>, ForkAsyncItem>();
Expand All @@ -96,51 +99,77 @@ function async<T>(iterable: AsyncIterable<T>) {
const iterator = iterable[Symbol.asyncIterator]();

const getNext = (forkItem: ForkAsyncItem) => {
let current: Promise<LinkedListNode<IteratorResult<T>> | null> =
Promise.resolve(forkItem.queue.getLastNode());

const done = () => {
iterator.next = forkItem.next;

return {
done: true,
value: undefined,
} as const;
const settlementQueue: [Resolve<T>, Reject][] = [];
let currentNode: LinkedListNode<IteratorResult<T>> | null =
forkItem.queue.getLastNode() ?? null;
let nextCallCount = 0;
let resolvedCount = 0;
let prevItem = Promise.resolve();

const fillBuffer = (concurrent: Concurrent) => {
const nextItem = forkItem.next(concurrent);

prevItem = prevItem
.then(() => nextItem)
.then(({ done, value }) => {
if (done) {
while (settlementQueue.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const [resolve] = settlementQueue.shift()!;
resolve({ done: true, value: undefined });
}

return void (forkItem.done = true);
}

forkItem.queue.insertLast({ done: false, value });
recur(concurrent);
})
.catch((reason) => {
forkItem.done = true;
while (settlementQueue.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const [, reject] = settlementQueue.shift()!;
reject(reason);
}
});
};

let isDone = false;
const next = async (_concurrent: any) => {
if (isDone) {
return done();
function consumeBuffer() {
while (
forkItem.queue.hasNext(currentNode) &&
nextCallCount > resolvedCount
) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const result = currentNode.getNext()!.getValue()!;
currentNode = currentNode.getNext();

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const [resolve] = settlementQueue.shift()!;
resolve(result);
resolvedCount++;
}
}

function recur(concurrent: Concurrent) {
if (nextCallCount === resolvedCount) {
return;
} else if (forkItem.queue.hasNext(currentNode)) {
consumeBuffer();
} else {
fillBuffer(concurrent);
}
}

const itemCurrent = await current;
const item = itemCurrent?.getNext();
const next = async (concurrent: Concurrent) => {
if (forkItem.done && !forkItem.queue.hasNext(currentNode)) {
return { done: true, value: undefined };
}

nextCallCount++;
return new Promise((resolve, reject) => {
if (isNil(item) || item === forkItem.queue.getTail()) {
return forkItem
.next(_concurrent)
.then((node) => {
current = current.then(() => {
return forkItem.queue.insertLast(node);
});

isDone = node.done ?? true;
if (isDone) {
return resolve(done());
}

return resolve(node);
})
.catch(reject);
}

current = current.then(() => {
return item;
});

resolve(item.getValue());
settlementQueue.push([resolve, reject]);
recur(concurrent);
});
};

Expand All @@ -153,6 +182,7 @@ function async<T>(iterable: AsyncIterable<T>) {
forkItem = {
queue: new LinkedList(),
next: originNext,
done: false,
};

iterator.next = getNext(forkItem) as any;
Expand Down
10 changes: 9 additions & 1 deletion src/dataStructure/linkedList/linkedList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class LinkedList<T> {
}

isEmpty() {
return !this.head.hasNext();
return this.head.getNext() === this.tail;
}

getHead() {
Expand All @@ -71,6 +71,14 @@ export class LinkedList<T> {
return this.tail.getPrev();
}

hasNext(node?: LinkedListNode<T> | null): node is LinkedListNode<T> {
if (node == null) {
return false;
}

return node.getNext() !== this.tail;
}

toArray() {
const arr = [];
let cur = this.head;
Expand Down
79 changes: 78 additions & 1 deletion test/Lazy/fork.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
import { fork, map, pipe, toAsync } from "../../src/index";
import {
concurrent,
delay,
fork,
map,
pipe,
range,
toArray,
toAsync,
} from "../../src/index";

describe("fork", function () {
describe("sync", function () {
Expand Down Expand Up @@ -212,5 +221,73 @@ describe("fork", function () {
expect(await iter3.next()).toEqual({ value: 12, done: false });
expect(await iter4.next()).toEqual({ value: 12, done: false });
});

it("forked iterable should be consumed concurrently", async function () {
const iter = pipe(
toAsync(range(10)),
map((a) => delay(500, a)),
);

const forkedIter = fork(iter);
const arr1 = await pipe(iter, concurrent(5), toArray);
expect(arr1).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);

const arr2 = await pipe(forkedIter, concurrent(5), toArray);
expect(arr2).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
}, 1050);

it("forked iterable should be consumed concurrently (forked iterable first)", async function () {
const iter = pipe(
toAsync(range(10)),
map((a) => delay(500, a)),
);

const forkedIter = fork(iter);
const arr1 = await pipe(forkedIter, concurrent(5), toArray);
expect(arr1).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);

const arr2 = await pipe(iter, concurrent(5), toArray);
expect(arr2).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
}, 1050);

it("forked iterable and the origin iterable must each be consumed simultaneously.", async function () {
const origin = pipe(
toAsync(range(6)),
map((a) => delay(500, a)),
);

const iter = pipe(origin, concurrent(3));
const forkedIter = pipe(fork(origin), concurrent(3));

const p1 = await iter.next();
const pf1 = await forkedIter.next();
const p2 = await iter.next();
const pf2 = await forkedIter.next();
const p3 = await iter.next();
const pf3 = await forkedIter.next();
const p4 = await iter.next();
const pf4 = await forkedIter.next();
const p5 = await iter.next();
const pf5 = await forkedIter.next();
const p6 = await iter.next();
const pf6 = await forkedIter.next();
const p7 = await iter.next();
const pf7 = await forkedIter.next();

expect(p1).toEqual({ value: 0, done: false });
expect(p2).toEqual({ value: 1, done: false });
expect(p3).toEqual({ value: 2, done: false });
expect(p4).toEqual({ value: 3, done: false });
expect(p5).toEqual({ value: 4, done: false });
expect(p6).toEqual({ value: 5, done: false });
expect(p7).toEqual({ value: undefined, done: true });
expect(pf1).toEqual({ value: 0, done: false });
expect(pf2).toEqual({ value: 1, done: false });
expect(pf3).toEqual({ value: 2, done: false });
expect(pf4).toEqual({ value: 3, done: false });
expect(pf5).toEqual({ value: 4, done: false });
expect(pf6).toEqual({ value: 5, done: false });
expect(pf7).toEqual({ value: undefined, done: true });
}, 1050);
});
});

0 comments on commit d31d011

Please sign in to comment.