Skip to content

Commit

Permalink
feat: async fork
Browse files Browse the repository at this point in the history
  • Loading branch information
ppeeou committed Mar 10, 2024
1 parent a40f6a7 commit ac4b927
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 37 deletions.
167 changes: 140 additions & 27 deletions src/Lazy/fork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,160 @@ type ReturnForkType<A extends Iterable<unknown> | AsyncIterable<unknown>> =

type Value = any;

const forkMap = new WeakMap<
Iterator<Value>,
LinkedList<IteratorResult<Value>>
>();
type ForkItem = {
queue: LinkedList<IteratorResult<Value>>;
originNext: () => IteratorResult<Value, any>;
};

const forkMap = new WeakMap<Iterator<Value>, ForkItem>();

function sync<T>(iterable: Iterable<T>) {
const iterator = iterable[Symbol.iterator]();
let queue = forkMap.get(iterator) as LinkedList<IteratorResult<T>>;
if (!queue) {
queue = new LinkedList();
forkMap.set(iterator, queue);
}

let current: LinkedListNode<IteratorResult<T>> | null = queue.getTail();
let done = false;
const getNext = (forkItem: ForkItem) => {
let current: LinkedListNode<IteratorResult<T>> | null =
forkItem.queue.getLastNode();

return {
[Symbol.iterator]() {
return iterator;
},
const done = () => {
iterator.next = forkItem.originNext;

return {
done: true,
value: undefined,
} as const;
};

next() {
if (done) {
return {
done,
value: undefined,
};
let isDone = false;
const next = () => {
if (isDone) {
return done();
}

const item = current?.getNext();
if (isNil(item)) {
const node = iterator.next();
current = queue.insertLast(node);
done = node.done ?? true;

if (isNil(item) || item === forkItem.queue.getTail()) {
const node = forkItem.originNext();

current = forkItem.queue.insertLast(node);
isDone = node.done ?? true;
if (isDone) {
return done();
}

return node;
}

current = item;
return current.getValue();
};

return next;
};

let forkItem = forkMap.get(iterator) as ForkItem;

if (!forkItem) {
const originNext = iterator.next.bind(iterator);
forkItem = {
queue: new LinkedList(),
originNext: originNext,
};

iterator.next = getNext(forkItem);
forkMap.set(iterator, forkItem);
}

const next = getNext(forkItem);

return {
[Symbol.iterator]() {
return this;
},

next: next,
};
}

type ForkAsyncItem = {
queue: LinkedList<IteratorResult<Value>>;
next: (...args: any) => Promise<IteratorResult<Value, any>>;
};

const forkAsyncMap = new WeakMap<AsyncIterator<Value>, ForkAsyncItem>();

function async<T>(iterable: AsyncIterable<T>) {
const iterator = iterable[Symbol.asyncIterator]();

const getNext = (forkItem: ForkAsyncItem) => {
let current: Promise<LinkedListNode<IteratorResult<T>> | null> =
Promise.resolve(forkItem.queue.getLastNode());

const done = () => {
iterator.next = forkItem.next;

return {
done: true,
value: undefined,
} as const;
};

let isDone = false;
const next = async (_concurrent: any) => {
if (isDone) {
return done();
}

const itemCurrent = await current;
const item = itemCurrent?.getNext();

return new Promise((resolve, reject) => {
if (isNil(item) || item === forkItem.queue.getTail()) {
return forkItem
.next(_concurrent)
.then((node) => {
current = current.then(() => {
return forkItem.queue.insertLast(node);
});

isDone = node.done ?? true;
if (isDone) {
return resolve(done());
}

return resolve(node);
})
.catch(reject);
}

current = current.then(() => {
return item;
});

resolve(item.getValue());
});
};

return next;
};

let forkItem = forkAsyncMap.get(iterator) as ForkAsyncItem;
if (!forkItem) {
const originNext = iterator.next.bind(iterator);
forkItem = {
queue: new LinkedList(),
next: originNext,
};

iterator.next = getNext(forkItem) as any;
forkAsyncMap.set(iterator, forkItem);
}

const next = getNext(forkItem);
return {
[Symbol.asyncIterator]() {
return this;
},
next: next,
};
}

Expand Down Expand Up @@ -101,11 +214,11 @@ function fork<A extends Iterable<unknown> | AsyncIterable<unknown>>(
iterable: A,
): ReturnForkType<A> {
if (isIterable(iterable)) {
return sync(iterable) as ReturnForkType<A>;
return sync(iterable) as any;
}

if (isAsyncIterable(iterable)) {
throw new TypeError("'fork' asyncIterable isn't supported not yet");
return async(iterable) as any;
}

throw new TypeError("'iterable' must be type of Iterable or AsyncIterable");
Expand Down
40 changes: 33 additions & 7 deletions src/dataStructure/linkedList/linkedList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,29 @@ export class LinkedList<T> {

constructor() {
this.head = new LinkedListNode(null as unknown as T);
this.tail = this.head;
this.tail = new LinkedListNode(null as unknown as T);

this.head.setNextNode(this.tail);
this.tail.setPrevNode(this.head);
}

insertFirst(value: T): LinkedListNode<T> {
const node = new LinkedListNode(value);
if (this.isEmpty()) {
this.tail = node;
this.tail.setPrevNode(node);
this.head.setNextNode(node);

node.setNextNode(this.tail);
node.setPrevNode(this.head);
} else {
node.setNextNode(this.head.getNext());
const firstNode = this.head.getNext();
if (!firstNode) {
throw new TypeError("firstNode must be a LinkedListNode");
}

node.setPrevNode(this.head);
node.setNextNode(firstNode);
firstNode.setPrevNode(node);
this.head.setNextNode(node);
}

Expand All @@ -28,8 +41,17 @@ export class LinkedList<T> {
}

const node = new LinkedListNode(value);
this.tail?.setNextNode(node);
this.tail = node;
const lastNode = this.tail.getPrev();

if (!lastNode) {
throw new TypeError("lastNode must be a LinkedListNode");
}

node.setPrevNode(lastNode);
node.setNextNode(this.tail);
lastNode.setNextNode(node);
this.tail.setPrevNode(node);

return node;
}

Expand All @@ -45,13 +67,17 @@ export class LinkedList<T> {
return this.tail;
}

getLastNode() {
return this.tail.getPrev();
}

toArray() {
const arr = [];
let cur = this.head;
while (cur.hasNext()) {
while (cur.getNext() !== this.tail) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
cur = cur.getNext()!;
arr.push(cur);
arr.push(cur.getValue());
}

return arr;
Expand Down
14 changes: 13 additions & 1 deletion src/dataStructure/linkedList/linkedListNode.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
export class LinkedListNode<T> {
private value: T;
private next: LinkedListNode<T> | null;
private prev: LinkedListNode<T> | null;

constructor(value: T) {
this.value = value;
this.next = null;
this.prev = null;
}

setNextNode(node: LinkedListNode<T> | null) {
setNextNode(node: LinkedListNode<T>) {
this.next = node;

return node;
}

setPrevNode(node: LinkedListNode<T>) {
this.prev = node;

return node;
}

getValue() {
return this.value;
}
Expand All @@ -21,6 +29,10 @@ export class LinkedListNode<T> {
return this.next;
}

getPrev() {
return this.prev;
}

hasNext() {
return this.next instanceof LinkedListNode;
}
Expand Down
Loading

0 comments on commit ac4b927

Please sign in to comment.