From 3e4fdd16df0461e56e58cc32686d3011f5e2b461 Mon Sep 17 00:00:00 2001 From: tuhm1 <50200070+tuhm1@users.noreply.github.com> Date: Mon, 8 Jul 2024 07:02:18 -0500 Subject: [PATCH] Allow changing concurrency limit (#79) --- index.d.ts | 5 +++++ index.js | 27 +++++++++++++++++++++++---- readme.md | 4 ++++ test.js | 40 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 4 deletions(-) diff --git a/index.d.ts b/index.d.ts index 9351750..038ba41 100644 --- a/index.d.ts +++ b/index.d.ts @@ -9,6 +9,11 @@ export type LimitFunction = { */ readonly pendingCount: number; + /** + Get or set the concurrency limit. + */ + concurrency: number; + /** Discard pending promises that are waiting to run. diff --git a/index.js b/index.js index d2c48b5..484ad3e 100644 --- a/index.js +++ b/index.js @@ -1,15 +1,13 @@ import Queue from 'yocto-queue'; export default function pLimit(concurrency) { - if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) { - throw new TypeError('Expected `concurrency` to be a number from 1 and up'); - } + validateConcurrency(concurrency); const queue = new Queue(); let activeCount = 0; const resumeNext = () => { - if (queue.size > 0) { + if (activeCount < concurrency && queue.size > 0) { queue.dequeue()(); // Since `pendingCount` has been decreased by one, increase `activeCount` by one. activeCount++; @@ -72,7 +70,28 @@ export default function pLimit(concurrency) { queue.clear(); }, }, + concurrency: { + get: () => concurrency, + + set(newConcurrency) { + validateConcurrency(newConcurrency); + concurrency = newConcurrency; + + queueMicrotask(() => { + // eslint-disable-next-line no-unmodified-loop-condition + while (activeCount < concurrency && queue.size > 0) { + resumeNext(); + } + }); + }, + }, }); return generator; } + +function validateConcurrency(concurrency) { + if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) { + throw new TypeError('Expected `concurrency` to be a number from 1 and up'); + } +} diff --git a/readme.md b/readme.md index 4e890f8..1d73ae1 100644 --- a/readme.md +++ b/readme.md @@ -74,6 +74,10 @@ This might be useful if you want to teardown the queue at the end of your progra Note: This does not cancel promises that are already running. +### limit.concurrency + +Get or set the concurrency limit. + ## FAQ ### How is this different from the [`p-queue`](https://github.com/sindresorhus/p-queue) package? diff --git a/test.js b/test.js index 39cef54..08bb306 100644 --- a/test.js +++ b/test.js @@ -177,3 +177,43 @@ test('throws on invalid concurrency argument', t => { pLimit(true); }); }); + +test('change concurrency to smaller value', async t => { + const limit = pLimit(4); + let running = 0; + const log = []; + const promises = Array.from({length: 10}).map(() => + limit(async () => { + ++running; + log.push(running); + await delay(50); + --running; + }), + ); + await delay(0); + t.is(running, 4); + + limit.concurrency = 2; + await Promise.all(promises); + t.deepEqual(log, [1, 2, 3, 4, 2, 2, 2, 2, 2, 2]); +}); + +test('change concurrency to bigger value', async t => { + const limit = pLimit(2); + let running = 0; + const log = []; + const promises = Array.from({length: 10}).map(() => + limit(async () => { + ++running; + log.push(running); + await delay(50); + --running; + }), + ); + await delay(0); + t.is(running, 2); + + limit.concurrency = 4; + await Promise.all(promises); + t.deepEqual(log, [1, 2, 3, 4, 4, 4, 4, 4, 4, 4]); +});