-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
142 lines (122 loc) · 4.6 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/**
* @typedef {Object} ResultPromisePoolRL
* @property {number} executionTimeMs - The total execution time
* @property {number} startedCount - The total started calls
* @property {number} failCount - The number of failed calls
* @property {number} averageRateHz - The overal average rate of call per seconds
* @property {Number} remaingConcurentCalls - The number of calls not completed - always 0 on success
*/
/**
* @typedef {Function} OnErrorPromisePoolRL
* @param {Error} error - The error
* @param {Promise} promise - The promise (async call which failed)
* @param {ResultPromisePoolRL} result - {executionTimeMs, startedCount, failCount, averageRateHz, remaingConcurentCalls}
* @return {boolean} if `true` calls will continues without failing
*/
/**
*
* @param {Function} next when called returns a Promise or false when done;
* @param {number} maxConcurent maximum size of the pool of running Promises
* @param {Object} [options]
* @param {number | null} [options.rateHz] maximum next() rate in Hz (call / second)
* @param {OnErrorPromisePoolRL} [options.onError] onError(error, promise) will be called when an error occurs. If not defined the process will stop.
* @returns {ResultPromisePoolRL} - {executionTimeMs, startedCount, failCount, averageRateHz, remaingConcurentCalls}
*/
function PromisePoolRL(next, maxConcurent, options) {
options = options || {};
let expectedDuration = 0;
if (options.rateHz) { // covers 0, null..
// standard wait depend on the number of concurent call
expectedDuration = 1000 * maxConcurent / options.rateHz ;
}
const startTime = Date.now();
let startedCount = 0;
let failCount = 0;
return new Promise((resolve, reject) => {
const durations = [];
const durationsMemorySize = maxConcurent;
// keep track of rejection to avoid multiple callback
let rejected = false;
function gotError(err, promise) {
failCount++;
if (options.onError !== undefined) {
const doNotFail = options.onError(err, promise, getResult());
if (doNotFail) return true;
}
if (! rejected) {
err.promisePoolRLResult = getResult();
reject(err);
}
rejected = true;
return false;
}
let countConcurentCalls = 0;
let done = false;
function goNext() {
if (done || rejected) {
if (countConcurentCalls === 0 && ! rejected) {
resolve(getResult());
}
return false;
}
// get next Promise to execute or goNext() if done to enventually resolve
const n = next();
if (! n) { done = true; return goNext(); }
countConcurentCalls++;
async function nn () { // async function to be able to delay
startedCount++;
// --- rate limiting ---//
const delay = getCurrentDuration();
if (delay > 0) await new Promise((r) => { setTimeout(r, delay) });
// --- end rate limiting --//
const callStartTime = Date.now();
n().then(() => { // resolve
addMyDuration(Date.now() - callStartTime);
countConcurentCalls--; goNext()
},
(e) => { // error
if ( gotError(e, n) ) { // continue
countConcurentCalls--; goNext()
}
});
};
nn();
return true;
}
// start by filling up the pool
for (let i = 0; i < maxConcurent; i++) {
if (! goNext(i * expectedDuration)) break;
}
// --- delay calculation ---- //
/**
* Calculate the average duration of "n" last calls and returns expected wait time
* @returns {number} - the delay to apply
*/
function getCurrentDuration() {
if (! options.rateHz) return 0;
if (durations.length < durationsMemorySize) return expectedDuration;
const averageDuration = durations.reduce((a, b) => a + b, 0) / durations.length;
return Math.max(0, expectedDuration - averageDuration);
}
/**
* records "n" last duration
* @param {number} duration
* @returns
*/
function addMyDuration(duration) {
if (! options.rateHz) return;
if (durations.length > durationsMemorySize) durations.shift();
durations.push(duration);
}
/**
* Get the result object, used by resolve and reject
* @returns {ResultPromisePoolRL}
*/
function getResult() {
const executionTimeMs = Date.now() - startTime;
const averageRateHz = 1000 * ( startedCount + failCount ) / executionTimeMs;
return {executionTimeMs, startedCount, failCount, averageRateHz, remaingConcurentCalls: countConcurentCalls};
}
});
}
module.exports = PromisePoolRL;