Skip to content

Commit

Permalink
Fix the concurrency option
Browse files Browse the repository at this point in the history
Fixes #6
Closes #7
  • Loading branch information
sindresorhus committed Mar 5, 2020
1 parent 9d8e204 commit 3bbb409
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 55 deletions.
13 changes: 7 additions & 6 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,23 @@ declare namespace pSettle {
Must be an integer from 1 and up or `Infinity`.
Note: This only limits concurrency for elements that are async functions, not promises.
@default Infinity
*/
readonly concurrency?: number;
}

type PromiseResult<ValueType> = pReflect.PromiseResult<ValueType>;
type PromiseFulfilledResult<ValueType> = pReflect.PromiseFulfilledResult<
ValueType
>;
type PromiseFulfilledResult<ValueType> = pReflect.PromiseFulfilledResult<ValueType>;
type PromiseRejectedResult = pReflect.PromiseRejectedResult;
}

/**
Settle promises concurrently and get their fulfillment value or rejection reason.
@returns A promise that is fulfilled when all input `promises` are settled.
@param array - Can contain a mix of any value, promise, and async function. Promises are awaited. Async functions are executed and awaited. The `concurrency` option only works for elements that are async functions.
@returns A promise that is fulfilled when all promises from the `array` argument are settled.
@example
```
Expand Down Expand Up @@ -52,8 +53,8 @@ import pSettle = require('p-settle');
```
*/
declare function pSettle<ValueType>(
promises: ReadonlyArray<ValueType | PromiseLike<ValueType>>,
array: ReadonlyArray<ValueType | PromiseLike<ValueType> | ((...args: any[]) => PromiseLike<ValueType>)>,
options?: pSettle.Options
): Promise<pSettle.PromiseResult<ValueType>[]>;
): Promise<Array<pSettle.PromiseResult<ValueType>>>;

export = pSettle;
15 changes: 10 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ const pLimit = require('p-limit');

module.exports = async (array, options = {}) => {
const {concurrency = Infinity} = options;
const limit = pLimit(concurrency);

if (array.find(element => typeof element.then === 'function') && concurrency) {
throw new Error('Cannot limit concurrency for promises')
}
return Promise.all(array.map(element => {
if (typeof element.then === 'function') { // eslint-disable-line promise/prefer-await-to-then
return pReflect(element);
}

const limit = pLimit(concurrency);
if (typeof element === 'function') {
return pReflect(limit(() => element()));
}

return Promise.all(promises.map(item => pReflect(limit(() => item))));
return pReflect(Promise.resolve(element));
}));
};
4 changes: 2 additions & 2 deletions index.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {expectType} from 'tsd';
import pSettle = require('.');
import {PromiseResult} from '.';

expectType<Promise<PromiseResult<number>[]>>(pSettle([1, Promise.resolve(2)]));
expectType<Promise<PromiseResult<number>[]>>(
expectType<Promise<Array<PromiseResult<number>>>>(pSettle([1, Promise.resolve(2)]));
expectType<Promise<Array<PromiseResult<number>>>>(
pSettle([1, Promise.resolve(2)], {concurrency: 1})
);
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
"devDependencies": {
"ava": "^1.4.1",
"delay": "^4.1.0",
"mock-require": "^3.0.3",
"in-range": "^2.0.0",
"time-span": "^3.1.0",
"tsd": "^0.11.0",
"xo": "^0.27.2"
}
Expand Down
12 changes: 8 additions & 4 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,21 @@ const pSettle = require('p-settle');

## API

### pSettle(promises, options?)
### pSettle(array, options?)

Returns a `Promise<Object[]>` that is fulfilled when all promises in `promises` are settled.
Returns a `Promise<object[]>` that is fulfilled when all promises from the `array` argument are settled.

The objects in the array have the following properties:

- `isFulfilled`
- `isRejected`
- `value` or `reason` *(Depending on whether the promise fulfilled or rejected)*

#### promises
#### array

Type: `Promise<unknown>[]`
Type: `Array<ValueType | PromiseLike<ValueType> | ((...args: any[]) => PromiseLike<ValueType>)>`

The array can contain a mix of any value, promise, and async function. Promises are awaited. Async functions are executed and awaited. The `concurrency` option only works for elements that are async functions.

#### options

Expand All @@ -66,6 +68,8 @@ Minimum: `1`

Number of concurrently pending promises.

**Note:** This only limits concurrency for elements that are async functions, not promises.

## Related

- [p-reflect](https://github.com/sindresorhus/p-reflect) - Make a promise always fulfill with its actual fulfillment value or rejection reason
Expand Down
77 changes: 40 additions & 37 deletions test.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,8 @@
import test from 'ava';
import delay from 'delay';
import mock from 'mock-require';
import realPLimit from 'p-limit';

let limitCalls = [];
mock('p-limit', concurrency => {
const limit = realPLimit(concurrency);

const mockLimit = itemFunction => {
limitCalls.push({
concurrency,
item: itemFunction()
});

return limit(itemFunction);
};

return mockLimit;
});

const pSettle = mock.reRequire('.');
import inRange from 'in-range';
import timeSpan from 'time-span';
import pSettle from '.';

test('main', async t => {
t.deepEqual(
Expand All @@ -45,26 +28,46 @@ test('main', async t => {
);
});

test('concurrency and item are passed to p-limit', async t => {
limitCalls = [];
test('concurrency option works', async t => {
const fixture = [
async () => {
await delay(300);
return 10;
},
async () => {
await delay(200);
return 20;
},
async () => {
await delay(100);
return 30;
}
];

const arraySize = 100;
const concurrency = 4;
const array = new Array(arraySize).fill(0).map((_, index) => Promise.resolve(index));
const resolvedCalls = new Array(arraySize).fill(0).map(() => ({concurrency}));
const end = timeSpan();

await pSettle(array, {concurrency});

await limitCalls.map(limitCall => limitCall.item).forEach((item, index) => {
item.then(data => {
resolvedCalls[index].item = data;
});
});
t.deepEqual(
await pSettle(fixture, {concurrency: 1}),
[
{
isFulfilled: true,
isRejected: false,
value: 10
},
{
isFulfilled: true,
isRejected: false,
value: 20
},
{
isFulfilled: true,
isRejected: false,
value: 30
}
]
);

await t.deepEqual(resolvedCalls, new Array(arraySize).fill(0).map((_, item) => ({
concurrency,
item
})));
t.true(inRange(end(), {start: 590, end: 760}));
});

test('handles empty iterable', async t => {
Expand Down

0 comments on commit 3bbb409

Please sign in to comment.