Skip to content

Commit

Permalink
[node-core-library] Weighted async concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
dmichon-msft committed May 1, 2023
1 parent e53e92d commit a10afdd
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@rushstack/node-core-library",
"comment": "Support variable concurrency based on task weights in `Async.forEachAsync`.",
"type": "minor"
}
],
"packageName": "@rushstack/node-core-library"
}
11 changes: 10 additions & 1 deletion common/reviews/api/node-core-library.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ export class AnsiEscape {

// @beta
export class Async {
static forEachAsync<TEntry>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options?: IAsyncParallelismOptions | undefined): Promise<void>;
static forEachAsync<TEntry>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options?: TEntry extends {
weight?: number;
} ? IAsyncParallelismOptionsWithWeight : IAsyncParallelismOptions | undefined): Promise<void>;
static mapAsync<TEntry, TRetVal>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>, options?: IAsyncParallelismOptions | undefined): Promise<TRetVal[]>;
static runWithRetriesAsync<TResult>({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions<TResult>): Promise<TResult>;
static sleep(ms: number): Promise<void>;
Expand Down Expand Up @@ -319,6 +321,13 @@ export interface IAsyncParallelismOptions {
concurrency?: number;
}

// Warning: (ae-incompatible-release-tags) The symbol "IAsyncParallelismOptionsWithWeight" is marked as @public, but its signature references "IAsyncParallelismOptions" which is marked as @beta
//
// @public (undocumented)
export interface IAsyncParallelismOptionsWithWeight extends IAsyncParallelismOptions {
useWeight?: boolean;
}

// @beta (undocumented)
export interface IColorableSequence {
// (undocumented)
Expand Down
25 changes: 22 additions & 3 deletions libraries/node-core-library/src/Async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ export interface IAsyncParallelismOptions {
concurrency?: number;
}

export interface IAsyncParallelismOptionsWithWeight extends IAsyncParallelismOptions {
/**
* If set, then will use `item.weight` instead of `1` as the parallelism consumed by a given iteration.
*/
useWeight?: boolean;
}

/**
* @remarks
* Used with {@link Async.runWithRetriesAsync}.
Expand Down Expand Up @@ -94,12 +101,17 @@ export class Async {
public static async forEachAsync<TEntry>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions | undefined
options?: TEntry extends { weight?: number }
? IAsyncParallelismOptionsWithWeight
: IAsyncParallelismOptions | undefined
): Promise<void> {
await new Promise<void>((resolve: () => void, reject: (error: Error) => void) => {
const concurrency: number =
options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity;
let operationsInProgress: number = 0;
let usedCapacity: number = 0;

const useWeight: boolean | undefined = (options as IAsyncParallelismOptionsWithWeight)?.useWeight;

const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
Expand All @@ -111,17 +123,23 @@ export class Async {
let promiseHasResolvedOrRejected: boolean = false;

async function queueOperationsAsync(): Promise<void> {
while (operationsInProgress < concurrency && !iteratorIsComplete && !promiseHasResolvedOrRejected) {
while (usedCapacity < concurrency && !iteratorIsComplete && !promiseHasResolvedOrRejected) {
// Increment the concurrency while waiting for the iterator.
// This function is reentrant, so this ensures that at most `concurrency` executions are waiting
usedCapacity++;
operationsInProgress++;
const currentIteratorResult: IteratorResult<TEntry> = await iterator.next();
// eslint-disable-next-line require-atomic-updates
iteratorIsComplete = !!currentIteratorResult.done;

if (!iteratorIsComplete) {
Promise.resolve(callback(currentIteratorResult.value, arrayIndex++))
const { value } = currentIteratorResult;
const weight: number = useWeight ? (value as { weight?: number })?.weight ?? 1 : 1;
// Was already incremented by 1
usedCapacity += weight - 1;
Promise.resolve(callback(value, arrayIndex++))
.then(async () => {
usedCapacity -= weight;
operationsInProgress--;
await onOperationCompletionAsync();
})
Expand All @@ -132,6 +150,7 @@ export class Async {
} else {
// The iterator is complete and there wasn't a value, so untrack the waiting state.
operationsInProgress--;
usedCapacity--;
}
}

Expand Down
8 changes: 7 additions & 1 deletion libraries/node-core-library/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@

export { AlreadyReportedError } from './AlreadyReportedError';
export { AnsiEscape, IAnsiEscapeConvertForTestsOptions } from './Terminal/AnsiEscape';
export { Async, AsyncQueue, IAsyncParallelismOptions, IRunWithRetriesOptions } from './Async';
export {
Async,
AsyncQueue,
IAsyncParallelismOptions,
IAsyncParallelismOptionsWithWeight,
IRunWithRetriesOptions
} from './Async';
export { Brand } from './PrimitiveTypes';
export { FileConstants, FolderConstants } from './Constants';
export { Enum } from './Enum';
Expand Down
49 changes: 49 additions & 0 deletions libraries/node-core-library/src/test/Async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,55 @@ describe(Async.name, () => {
await finalPromise;
});

it('if concurrency and useWeight are set, ensures no more than N/weight operations occur in parallel', async () => {
interface INumberWithWeight {
n: number;
weight: number;
}

interface ITestCase {
concurrency: number;
weight: number;
expectedConcurrency: number;
}

const cases: ITestCase[] = [
{
concurrency: 2,
weight: 0.5,
expectedConcurrency: 4
},
{
concurrency: 1,
weight: 0.4,
expectedConcurrency: 3
},
{
concurrency: 1,
weight: 0.17,
expectedConcurrency: 6
}
];

for (const { concurrency, weight, expectedConcurrency } of cases) {
let running: number = 0;
let maxRunning: number = 0;

const array: INumberWithWeight[] = [1, 2, 3, 4, 5, 6, 7, 8].map((n) => ({ n, weight }));

const fn: (item: INumberWithWeight) => Promise<void> = jest.fn(async (item) => {
running++;
await Async.sleep(1);
maxRunning = Math.max(maxRunning, running);
running--;
});

await Async.forEachAsync(array, fn, { concurrency, useWeight: true });
expect(fn).toHaveBeenCalledTimes(8);
expect(maxRunning).toEqual(expectedConcurrency);
}
});

it('rejects if an async iterator rejects', async () => {
const expectedError: Error = new Error('iterator error');
let iteratorIndex: number = 0;
Expand Down

0 comments on commit a10afdd

Please sign in to comment.