diff --git a/common/changes/@rushstack/node-core-library/weighted-concurrency_2023-05-01-19-41.json b/common/changes/@rushstack/node-core-library/weighted-concurrency_2023-05-01-19-41.json new file mode 100644 index 00000000000..8036753ea8e --- /dev/null +++ b/common/changes/@rushstack/node-core-library/weighted-concurrency_2023-05-01-19-41.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@rushstack/node-core-library", + "comment": "Support variable concurrency based on task weights in `Async.forEachAsync`.", + "type": "minor" + } + ], + "packageName": "@rushstack/node-core-library" +} \ No newline at end of file diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index f50b555801f..2fbc7f2743c 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -33,7 +33,9 @@ export class AnsiEscape { // @beta export class Async { - static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; + static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: TEntry extends { + weight?: number; + } ? IAsyncParallelismOptionsWithWeight : IAsyncParallelismOptions | undefined): Promise; static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; static runWithRetriesAsync({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions): Promise; static sleep(ms: number): Promise; @@ -319,6 +321,13 @@ export interface IAsyncParallelismOptions { concurrency?: number; } +// Warning: (ae-incompatible-release-tags) The symbol "IAsyncParallelismOptionsWithWeight" is marked as @public, but its signature references "IAsyncParallelismOptions" which is marked as @beta +// +// @public (undocumented) +export interface IAsyncParallelismOptionsWithWeight extends IAsyncParallelismOptions { + useWeight?: boolean; +} + // @beta (undocumented) export interface IColorableSequence { // (undocumented) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 5f960409cc0..612dc616d80 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -17,6 +17,13 @@ export interface IAsyncParallelismOptions { concurrency?: number; } +export interface IAsyncParallelismOptionsWithWeight extends IAsyncParallelismOptions { + /** + * If set, then will use `item.weight` instead of `1` as the parallelism consumed by a given iteration. + */ + useWeight?: boolean; +} + /** * @remarks * Used with {@link Async.runWithRetriesAsync}. @@ -94,12 +101,17 @@ export class Async { public static async forEachAsync( iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, - options?: IAsyncParallelismOptions | undefined + options?: TEntry extends { weight?: number } + ? IAsyncParallelismOptionsWithWeight + : IAsyncParallelismOptions | undefined ): Promise { await new Promise((resolve: () => void, reject: (error: Error) => void) => { const concurrency: number = options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; let operationsInProgress: number = 0; + let usedCapacity: number = 0; + + const useWeight: boolean | undefined = (options as IAsyncParallelismOptionsWithWeight)?.useWeight; const iterator: Iterator | AsyncIterator = ( (iterable as Iterable)[Symbol.iterator] || @@ -111,17 +123,23 @@ export class Async { let promiseHasResolvedOrRejected: boolean = false; async function queueOperationsAsync(): Promise { - while (operationsInProgress < concurrency && !iteratorIsComplete && !promiseHasResolvedOrRejected) { + while (usedCapacity < concurrency && !iteratorIsComplete && !promiseHasResolvedOrRejected) { // Increment the concurrency while waiting for the iterator. // This function is reentrant, so this ensures that at most `concurrency` executions are waiting + usedCapacity++; operationsInProgress++; const currentIteratorResult: IteratorResult = await iterator.next(); // eslint-disable-next-line require-atomic-updates iteratorIsComplete = !!currentIteratorResult.done; if (!iteratorIsComplete) { - Promise.resolve(callback(currentIteratorResult.value, arrayIndex++)) + const { value } = currentIteratorResult; + const weight: number = useWeight ? (value as { weight?: number })?.weight ?? 1 : 1; + // Was already incremented by 1 + usedCapacity += weight - 1; + Promise.resolve(callback(value, arrayIndex++)) .then(async () => { + usedCapacity -= weight; operationsInProgress--; await onOperationCompletionAsync(); }) @@ -132,6 +150,7 @@ export class Async { } else { // The iterator is complete and there wasn't a value, so untrack the waiting state. operationsInProgress--; + usedCapacity--; } } diff --git a/libraries/node-core-library/src/index.ts b/libraries/node-core-library/src/index.ts index 87cf3877c4d..ef5ff7b2e61 100644 --- a/libraries/node-core-library/src/index.ts +++ b/libraries/node-core-library/src/index.ts @@ -9,7 +9,13 @@ export { AlreadyReportedError } from './AlreadyReportedError'; export { AnsiEscape, IAnsiEscapeConvertForTestsOptions } from './Terminal/AnsiEscape'; -export { Async, AsyncQueue, IAsyncParallelismOptions, IRunWithRetriesOptions } from './Async'; +export { + Async, + AsyncQueue, + IAsyncParallelismOptions, + IAsyncParallelismOptionsWithWeight, + IRunWithRetriesOptions +} from './Async'; export { Brand } from './PrimitiveTypes'; export { FileConstants, FolderConstants } from './Constants'; export { Enum } from './Enum'; diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 14cd54a6fb4..e3aab6c98d3 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -291,6 +291,55 @@ describe(Async.name, () => { await finalPromise; }); + it('if concurrency and useWeight are set, ensures no more than N/weight operations occur in parallel', async () => { + interface INumberWithWeight { + n: number; + weight: number; + } + + interface ITestCase { + concurrency: number; + weight: number; + expectedConcurrency: number; + } + + const cases: ITestCase[] = [ + { + concurrency: 2, + weight: 0.5, + expectedConcurrency: 4 + }, + { + concurrency: 1, + weight: 0.4, + expectedConcurrency: 3 + }, + { + concurrency: 1, + weight: 0.17, + expectedConcurrency: 6 + } + ]; + + for (const { concurrency, weight, expectedConcurrency } of cases) { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [1, 2, 3, 4, 5, 6, 7, 8].map((n) => ({ n, weight })); + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(1); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency, useWeight: true }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(expectedConcurrency); + } + }); + it('rejects if an async iterator rejects', async () => { const expectedError: Error = new Error('iterator error'); let iteratorIndex: number = 0;