Skip to content

Commit

Permalink
throttled node check to allow only N active requests
Browse files Browse the repository at this point in the history
  • Loading branch information
afostr committed Jun 10, 2024
1 parent fb7ba4c commit 515ec7e
Showing 1 changed file with 79 additions and 2 deletions.
81 changes: 79 additions & 2 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,38 @@ export async function updateNodeList(tryInfinate = false): Promise<void> {
})
}
if (config.filterDeadNodesFromArchiver) {
const promises = nodes.map(checkIfNodeIsActive)
const results = await Promise.all(promises)
//const promises = nodes.map(checkIfNodeIsActive) //dont blast all requests at once

const concurrentRequests = 50
const semaphore = new Semaphore(concurrentRequests);
const results: boolean[] = new Array(nodes.length).fill(false);

const waitForAllPromise = new Deferred<void>()
let finished = 0
for(let i=0; i<nodes.length; i++) {
const node = nodes[i]
//promises.push(checkIfNodeIsActive(node)) //not stacking up promises
//@ts-ignore what are we using es5 for?
async function throttledNodeCheck(node: Node, index: number) {
await semaphore.wait()
try {
let res = await checkIfNodeIsActive(node)
results[index] = res
} finally {
semaphore.signal()
finished++
if (finished >= nodes.length) {
waitForAllPromise.resolve()
}
}
}
throttledNodeCheck(node, i)
}

//wait for finished count
await waitForAllPromise

//const results = await Promise.all(promises)
const activeNodes = nodes.filter((_, index) => results[index])
nodeList = activeNodes
if (verbose)
Expand Down Expand Up @@ -1773,3 +1803,50 @@ export function hexToBN(hexString: string): BN {
}
return new BN(hexString, 16)
}


class Semaphore {
private queue: (() => void)[] = [];
private value: number;

constructor(maxConcurrency: number) {
this.value = maxConcurrency;
}

async wait(): Promise<void> {
return new Promise<void>((resolve) => {
const tryAcquire = () => {
if (this.value > 0) {
this.value--;
resolve();
} else {
this.queue.push(tryAcquire);
}
};
tryAcquire();
});
}

signal(): void {
this.value++;
if (this.queue.length > 0) {
const next = this.queue.shift();
if (next) {
next();
}
}
}
}

class Deferred<T> {
public promise: Promise<T>;
public resolve!: (value: T | PromiseLike<T>) => void;
public reject!: (reason?: any) => void;

constructor() {
this.promise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
}

0 comments on commit 515ec7e

Please sign in to comment.