From 2aa332cfe7ff8641abafcb7aa4ae1a988f2b909b Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 8 May 2024 11:52:51 -0400 Subject: [PATCH 01/11] fix weighting --- libraries/node-core-library/src/Async.ts | 140 ++++++++++-------- .../logic/operations/AsyncOperationQueue.ts | 1 + .../operations/OperationExecutionRecord.ts | 6 +- .../operations/WeightedOperationPlugin.ts | 1 + 4 files changed, 81 insertions(+), 67 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 400cb8e00d8..decf28c76ad 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -73,6 +73,34 @@ function toWeightedIterator( }; } +const createLock = () => { + const queue: Array<() => Promise> = []; + let active = false; + return (fn: () => Promise) => { + let deferredResolve: (val: T) => void; + let deferredReject: () => void; + const deferred = new Promise((resolve, reject) => { + deferredResolve = resolve; + deferredReject = reject; + }); + const exec = async () => { + await fn().then(deferredResolve, deferredReject); + if (queue.length > 0) { + queue.shift()?.(); + } else { + active = false; + } + }; + if (active) { + queue.push(exec); + } else { + active = true; + exec(); + } + return deferred; + }; +}; + /** * Utilities for parallel asynchronous operations, for use with the system `Promise` APIs. * @@ -155,77 +183,59 @@ export class Async { callback: (entry: TReturn, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined ): Promise { - await new Promise((resolve: () => void, reject: (error: Error) => void) => { - const concurrency: number = - options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; - let concurrentUnitsInProgress: number = 0; - - const iterator: Iterator | AsyncIterator = ( - (iterable as Iterable)[Symbol.iterator] || - (iterable as AsyncIterable)[Symbol.asyncIterator] - ).call(iterable); - - let arrayIndex: number = 0; - let iteratorIsComplete: boolean = false; - let promiseHasResolvedOrRejected: boolean = false; - - async function queueOperationsAsync(): Promise { - while ( - concurrentUnitsInProgress < concurrency && - !iteratorIsComplete && - !promiseHasResolvedOrRejected - ) { - // Increment the concurrency while waiting for the iterator. - // This function is reentrant, so this ensures that at most `concurrency` executions are waiting - concurrentUnitsInProgress++; - const currentIteratorResult: IteratorResult = await iterator.next(); - // eslint-disable-next-line require-atomic-updates - iteratorIsComplete = !!currentIteratorResult.done; - - if (!iteratorIsComplete) { - const currentIteratorValue: TEntry = currentIteratorResult.value; - Async.validateWeightedIterable(currentIteratorValue); - const weight: number = Math.min(currentIteratorValue.weight, concurrency); - // If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1. - // Cap it to the concurrency limit, otherwise higher weights can cause issues in the case where 0 weighted - // operations are present. - concurrentUnitsInProgress += weight - 1; - Promise.resolve(callback(currentIteratorValue.element, arrayIndex++)) - .then(async () => { - concurrentUnitsInProgress -= weight; - await onOperationCompletionAsync(); - }) - .catch((error) => { - promiseHasResolvedOrRejected = true; - reject(error); - }); - } else { - // The iterator is complete and there wasn't a value, so untrack the waiting state. - concurrentUnitsInProgress--; - } - } + const concurrency: number = + options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; + let concurrentUnitsInProgress: number = 0; + + const iterator: Iterator | AsyncIterator = ( + (iterable as Iterable)[Symbol.iterator] || + (iterable as AsyncIterable)[Symbol.asyncIterator] + ).call(iterable); + + let arrayIndex: number = 0; + + const pending: Set> = new Set(); + + const lock = createLock(); + let iteratorIsDone = false; - if (iteratorIsComplete) { - await onOperationCompletionAsync(); + // eslint-disable-next-line no-constant-condition + while (true) { + await lock(async () => { + if (concurrentUnitsInProgress >= concurrency) { + return; } - } + const currentIteratorResult: IteratorResult = await iterator.next(); + console.log('iterator result', currentIteratorResult.done, currentIteratorResult.value.element?.name); + if (currentIteratorResult.done) { + iteratorIsDone = true; + return; + } + + const { element, weight } = currentIteratorResult.value; + + concurrentUnitsInProgress += weight; + const promise: Promise = Promise.resolve(callback(element, arrayIndex++)).then(() => { + console.log('resolved', (element as any).name); + concurrentUnitsInProgress -= weight; + pending.delete(promise); + }); + pending.add(promise); - async function onOperationCompletionAsync(): Promise { - if (!promiseHasResolvedOrRejected) { - if (concurrentUnitsInProgress === 0 && iteratorIsComplete) { - promiseHasResolvedOrRejected = true; - resolve(); - } else if (!iteratorIsComplete) { - await queueOperationsAsync(); - } + // eslint-disable-next-line no-unmodified-loop-condition + while (concurrentUnitsInProgress >= concurrency && pending.size > 0) { + await Promise.race(Array.from(pending)); } + }); + + if (iteratorIsDone) { + break; } + } - queueOperationsAsync().catch((error) => { - promiseHasResolvedOrRejected = true; - reject(error); - }); - }); + if (pending.size > 0) { + await Promise.all(Array.from(pending)); + } } /** diff --git a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts index 95cda7813e8..c68da9e0328 100644 --- a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts +++ b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts @@ -138,6 +138,7 @@ export class AsyncOperationQueue // This task is ready to process, hand it to the iterator. // Needs to have queue semantics, otherwise tools that iterate it get confused record.status = OperationStatus.Queued; + console.log('queueing', record.weight, record.name); waitingIterators.shift()!({ value: record, done: false diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts index 5846eddd483..e9e172c63bd 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts @@ -96,7 +96,6 @@ export class OperationExecutionRecord implements IOperationRunnerContext { public readonly stdioSummarizer: StdioSummarizer = new StdioSummarizer(); public readonly runner: IOperationRunner; - public readonly weight: number; public readonly associatedPhase: IPhase | undefined; public readonly associatedProject: RushConfigurationProject | undefined; public readonly _operationMetadataManager: OperationMetadataManager | undefined; @@ -117,7 +116,6 @@ export class OperationExecutionRecord implements IOperationRunnerContext { this.operation = operation; this.runner = runner; - this.weight = operation.weight; this.associatedPhase = associatedPhase; this.associatedProject = associatedProject; if (operation.associatedPhase && operation.associatedProject) { @@ -134,6 +132,10 @@ export class OperationExecutionRecord implements IOperationRunnerContext { return this.runner.name; } + public get weight(): number { + return this.operation.weight; + } + public get debugMode(): boolean { return this._context.debugMode; } diff --git a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts index c33e5971542..1880f28961d 100644 --- a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts +++ b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts @@ -44,6 +44,7 @@ function weightOperations( operation.weight = operationSettings.weight; } } + console.log('Operation weight:', operation.name, operation.weight); Async.validateWeightedIterable(operation); } return operations; From 1a677ba6d97fe027e6ba82363c1fa8b9b384720d Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 8 May 2024 12:30:36 -0400 Subject: [PATCH 02/11] adjustments --- libraries/node-core-library/src/Async.ts | 49 ++++++++----------- .../node-core-library/src/test/Async.test.ts | 43 ++++++++++++++++ .../operations/OperationExecutionManager.ts | 1 + 3 files changed, 65 insertions(+), 28 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index decf28c76ad..cc7efdc6381 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -196,40 +196,33 @@ export class Async { const pending: Set> = new Set(); - const lock = createLock(); - let iteratorIsDone = false; - // eslint-disable-next-line no-constant-condition while (true) { - await lock(async () => { - if (concurrentUnitsInProgress >= concurrency) { - return; - } - const currentIteratorResult: IteratorResult = await iterator.next(); - console.log('iterator result', currentIteratorResult.done, currentIteratorResult.value.element?.name); - if (currentIteratorResult.done) { - iteratorIsDone = true; - return; - } - - const { element, weight } = currentIteratorResult.value; + const currentIteratorResult: IteratorResult = await iterator.next(); + console.log( + 'iterator result', + currentIteratorResult.done, + currentIteratorResult.value.element?.name, + currentIteratorResult.value.element?.weight, + concurrency + ); + if (currentIteratorResult.done) { + break; + } - concurrentUnitsInProgress += weight; - const promise: Promise = Promise.resolve(callback(element, arrayIndex++)).then(() => { - console.log('resolved', (element as any).name); - concurrentUnitsInProgress -= weight; - pending.delete(promise); - }); - pending.add(promise); + const { element, weight } = currentIteratorResult.value; - // eslint-disable-next-line no-unmodified-loop-condition - while (concurrentUnitsInProgress >= concurrency && pending.size > 0) { - await Promise.race(Array.from(pending)); - } + concurrentUnitsInProgress += weight; + const promise: Promise = Promise.resolve(callback(element, arrayIndex++)).then(() => { + console.log('resolved', (element as any).name); + concurrentUnitsInProgress -= weight; + pending.delete(promise); }); + pending.add(promise); - if (iteratorIsDone) { - break; + // eslint-disable-next-line no-unmodified-loop-condition + while (concurrentUnitsInProgress >= concurrency && pending.size > 0) { + await Promise.race(Array.from(pending)); } } diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 0060f338050..3dc57b4ad17 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -494,6 +494,49 @@ describe(Async.name, () => { expect(fn).toHaveBeenCalledTimes(10); expect(maxRunning).toEqual(9); }); + + it('does not exceed the maxiumum concurrency for an async iterator when weighted', async () => { + let waitingIterators: number = 0; + + let resolve2!: (value: { done: true; value: undefined }) => void; + const signal2: Promise<{ done: true; value: undefined }> = new Promise((resolve, reject) => { + resolve2 = resolve; + }); + + let iteratorIndex: number = 0; + const asyncIterator: AsyncIterator<{ element: number; weight: number }> = { + next: () => { + iteratorIndex++; + if (iteratorIndex < 20) { + return Promise.resolve({ done: false, value: { element: iteratorIndex, weight: 2 } }); + } else { + ++waitingIterators; + return signal2; + } + } + }; + const asyncIterable: AsyncIterable<{ element: number; weight: number }> = { + [Symbol.asyncIterator]: () => asyncIterator + }; + + const expectedConcurrency: 4 = 4; + const finalPromise: Promise = Async.forEachAsync( + asyncIterable, + async (item) => { + // Do nothing + }, + { + concurrency: expectedConcurrency, + weighted: true + } + ); + + // Wait for all the instant resolutions to be done + await Async.sleep(0); + expect(waitingIterators).toEqual(expectedConcurrency); + resolve2({ done: true, value: undefined }); + await finalPromise; + }); }); describe(Async.runWithRetriesAsync.name, () => { diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts index db86ca8e194..e59300c9089 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts @@ -256,6 +256,7 @@ export class OperationExecutionManager { await Async.forEachAsync( this._executionQueue, async (operation: IOperationIteratorResult) => { + console.log('queueing operation' + operation.status); let record: OperationExecutionRecord | undefined; /** * If the operation is UNASSIGNED_OPERATION, it means that the queue is not able to assign a operation. From 1e350d7a2d8550a28d30c4bf4c2ba8e36bb15424 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 8 May 2024 13:34:33 -0400 Subject: [PATCH 03/11] fixed the initial implementation --- libraries/node-core-library/src/Async.ts | 17 ++++------------- .../src/logic/operations/AsyncOperationQueue.ts | 2 ++ .../operations/OperationExecutionManager.ts | 1 - 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index cc7efdc6381..1692ad40c6c 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -179,7 +179,7 @@ export class Async { } private static async _forEachWeightedAsync( - iterable: Iterable | AsyncIterable, + iterable: AsyncIterable, callback: (entry: TReturn, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined ): Promise { @@ -187,10 +187,9 @@ export class Async { options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; let concurrentUnitsInProgress: number = 0; - const iterator: Iterator | AsyncIterator = ( - (iterable as Iterable)[Symbol.iterator] || - (iterable as AsyncIterable)[Symbol.asyncIterator] - ).call(iterable); + const iterator: AsyncIterator = (iterable as AsyncIterable)[Symbol.asyncIterator].call( + iterable + ); let arrayIndex: number = 0; @@ -199,13 +198,6 @@ export class Async { // eslint-disable-next-line no-constant-condition while (true) { const currentIteratorResult: IteratorResult = await iterator.next(); - console.log( - 'iterator result', - currentIteratorResult.done, - currentIteratorResult.value.element?.name, - currentIteratorResult.value.element?.weight, - concurrency - ); if (currentIteratorResult.done) { break; } @@ -214,7 +206,6 @@ export class Async { concurrentUnitsInProgress += weight; const promise: Promise = Promise.resolve(callback(element, arrayIndex++)).then(() => { - console.log('resolved', (element as any).name); concurrentUnitsInProgress -= weight; pending.delete(promise); }); diff --git a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts index c68da9e0328..20891d53dfd 100644 --- a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts +++ b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts @@ -93,6 +93,8 @@ export class AsyncOperationQueue } } + this.assignOperations(); + if (this._completedOperations.size === this._totalOperations) { this._isDone = true; } diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts index e59300c9089..db86ca8e194 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts @@ -256,7 +256,6 @@ export class OperationExecutionManager { await Async.forEachAsync( this._executionQueue, async (operation: IOperationIteratorResult) => { - console.log('queueing operation' + operation.status); let record: OperationExecutionRecord | undefined; /** * If the operation is UNASSIGNED_OPERATION, it means that the queue is not able to assign a operation. From cd403fd9e2668ed4eaaa1f3b300a0f4cab3f227a Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 8 May 2024 15:19:25 -0400 Subject: [PATCH 04/11] reentry fix --- libraries/node-core-library/src/Async.ts | 93 ++++++++++++++++-------- 1 file changed, 64 insertions(+), 29 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 1692ad40c6c..3eddc2f0656 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -183,43 +183,78 @@ export class Async { callback: (entry: TReturn, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined ): Promise { - const concurrency: number = - options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; - let concurrentUnitsInProgress: number = 0; + await new Promise((resolve: () => void, reject: (error: Error) => void) => { + const concurrency: number = + options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; + let concurrentUnitsInProgress: number = 0; - const iterator: AsyncIterator = (iterable as AsyncIterable)[Symbol.asyncIterator].call( - iterable - ); + const iterator: Iterator | AsyncIterator = (iterable as AsyncIterable)[ + Symbol.asyncIterator + ].call(iterable); - let arrayIndex: number = 0; + let arrayIndex: number = 0; + let iteratorIsComplete: boolean = false; + let promiseHasResolvedOrRejected: boolean = false; - const pending: Set> = new Set(); + async function queueOperationsAsync(): Promise { + while ( + concurrentUnitsInProgress < concurrency && + !iteratorIsComplete && + !promiseHasResolvedOrRejected + ) { + // Increment the concurrency while waiting for the iterator. + // This function is reentrant, so this ensures that at most `concurrency` executions are waiting + const limitedConcurrency = !Number.isFinite(concurrency) ? 1 : concurrency; + concurrentUnitsInProgress += limitedConcurrency; + const currentIteratorResult: IteratorResult = await iterator.next(); + // eslint-disable-next-line require-atomic-updates + iteratorIsComplete = !!currentIteratorResult.done; - // eslint-disable-next-line no-constant-condition - while (true) { - const currentIteratorResult: IteratorResult = await iterator.next(); - if (currentIteratorResult.done) { - break; - } - - const { element, weight } = currentIteratorResult.value; + if (!iteratorIsComplete) { + const currentIteratorValue: TEntry = currentIteratorResult.value; + Async.validateWeightedIterable(currentIteratorValue); + const weight: number = Math.min(currentIteratorValue.weight, concurrency); + // If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1. + // Cap it to the concurrency limit, otherwise higher weights can cause issues in the case where 0 weighted + // operations are present. + concurrentUnitsInProgress -= limitedConcurrency; + concurrentUnitsInProgress += weight; + Promise.resolve(callback(currentIteratorValue.element, arrayIndex++)) + .then(async () => { + concurrentUnitsInProgress -= weight; + await onOperationCompletionAsync(); + }) + .catch((error) => { + promiseHasResolvedOrRejected = true; + reject(error); + }); + } else { + // The iterator is complete and there wasn't a value, so untrack the waiting state. + concurrentUnitsInProgress -= limitedConcurrency; + } + } - concurrentUnitsInProgress += weight; - const promise: Promise = Promise.resolve(callback(element, arrayIndex++)).then(() => { - concurrentUnitsInProgress -= weight; - pending.delete(promise); - }); - pending.add(promise); + if (iteratorIsComplete) { + await onOperationCompletionAsync(); + } + } - // eslint-disable-next-line no-unmodified-loop-condition - while (concurrentUnitsInProgress >= concurrency && pending.size > 0) { - await Promise.race(Array.from(pending)); + async function onOperationCompletionAsync(): Promise { + if (!promiseHasResolvedOrRejected) { + if (concurrentUnitsInProgress === 0 && iteratorIsComplete) { + promiseHasResolvedOrRejected = true; + resolve(); + } else if (!iteratorIsComplete) { + await queueOperationsAsync(); + } + } } - } - if (pending.size > 0) { - await Promise.all(Array.from(pending)); - } + queueOperationsAsync().catch((error) => { + promiseHasResolvedOrRejected = true; + reject(error); + }); + }); } /** From 56950b9523f33dc0f7f34eb565e9b8a776072b22 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 8 May 2024 15:45:33 -0400 Subject: [PATCH 05/11] cleanup --- libraries/node-core-library/src/Async.ts | 32 +--------- .../node-core-library/src/test/Async.test.ts | 58 +++++++++++++++++-- .../logic/operations/AsyncOperationQueue.ts | 1 - .../operations/WeightedOperationPlugin.ts | 1 - 4 files changed, 54 insertions(+), 38 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 3eddc2f0656..97b8478fcad 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -73,34 +73,6 @@ function toWeightedIterator( }; } -const createLock = () => { - const queue: Array<() => Promise> = []; - let active = false; - return (fn: () => Promise) => { - let deferredResolve: (val: T) => void; - let deferredReject: () => void; - const deferred = new Promise((resolve, reject) => { - deferredResolve = resolve; - deferredReject = reject; - }); - const exec = async () => { - await fn().then(deferredResolve, deferredReject); - if (queue.length > 0) { - queue.shift()?.(); - } else { - active = false; - } - }; - if (active) { - queue.push(exec); - } else { - active = true; - exec(); - } - return deferred; - }; -}; - /** * Utilities for parallel asynchronous operations, for use with the system `Promise` APIs. * @@ -204,7 +176,7 @@ export class Async { ) { // Increment the concurrency while waiting for the iterator. // This function is reentrant, so this ensures that at most `concurrency` executions are waiting - const limitedConcurrency = !Number.isFinite(concurrency) ? 1 : concurrency; + const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency; concurrentUnitsInProgress += limitedConcurrency; const currentIteratorResult: IteratorResult = await iterator.next(); // eslint-disable-next-line require-atomic-updates @@ -217,8 +189,8 @@ export class Async { // If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1. // Cap it to the concurrency limit, otherwise higher weights can cause issues in the case where 0 weighted // operations are present. - concurrentUnitsInProgress -= limitedConcurrency; concurrentUnitsInProgress += weight; + concurrentUnitsInProgress -= limitedConcurrency; Promise.resolve(callback(currentIteratorValue.element, arrayIndex++)) .then(async () => { concurrentUnitsInProgress -= weight; diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 3dc57b4ad17..b090270a4b3 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -273,20 +273,21 @@ describe(Async.name, () => { [Symbol.asyncIterator]: () => asyncIterator }; - const expectedConcurrency: 4 = 4; const finalPromise: Promise = Async.forEachAsync( asyncIterable, async (item) => { // Do nothing }, { - concurrency: expectedConcurrency + concurrency: 4 } ); // Wait for all the instant resolutions to be done await Async.sleep(0); - expect(waitingIterators).toEqual(expectedConcurrency); + + // The final iteration cycle is locked, so only 1 iterator is waiting. + expect(waitingIterators).toEqual(1); resolve2({ done: true, value: undefined }); await finalPromise; }); @@ -519,21 +520,66 @@ describe(Async.name, () => { [Symbol.asyncIterator]: () => asyncIterator }; - const expectedConcurrency: 4 = 4; const finalPromise: Promise = Async.forEachAsync( asyncIterable, async (item) => { // Do nothing }, { - concurrency: expectedConcurrency, + concurrency: 4, + weighted: true + } + ); + + // Wait for all the instant resolutions to be done + await Async.sleep(0); + + // The final iteration cycle is locked, so only 1 iterator is waiting. + expect(waitingIterators).toEqual(1); + resolve2({ done: true, value: undefined }); + await finalPromise; + }); + + it('does not exceed the maxiumum concurrency for an async iterator when concurrency set to infinite', async () => { + let waitingIterators: number = 0; + + let resolve2!: (value: { done: true; value: undefined }) => void; + const signal2: Promise<{ done: true; value: undefined }> = new Promise((resolve, reject) => { + resolve2 = resolve; + }); + + let iteratorIndex: number = 0; + const asyncIterator: AsyncIterator<{ element: number; weight: number }> = { + next: () => { + iteratorIndex++; + if (iteratorIndex < 20) { + return Promise.resolve({ done: false, value: { element: iteratorIndex, weight: 2 } }); + } else { + ++waitingIterators; + return signal2; + } + } + }; + const asyncIterable: AsyncIterable<{ element: number; weight: number }> = { + [Symbol.asyncIterator]: () => asyncIterator + }; + + const finalPromise: Promise = Async.forEachAsync( + asyncIterable, + async (item) => { + // Do nothing + }, + { + concurrency: Number.POSITIVE_INFINITY, weighted: true } ); // Wait for all the instant resolutions to be done await Async.sleep(0); - expect(waitingIterators).toEqual(expectedConcurrency); + + // The final iteration cycle is locked, so only 1 iterator is waiting. + expect(waitingIterators).toEqual(1); resolve2({ done: true, value: undefined }); await finalPromise; }); diff --git a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts index 20891d53dfd..a653208f122 100644 --- a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts +++ b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts @@ -140,7 +140,6 @@ export class AsyncOperationQueue // This task is ready to process, hand it to the iterator. // Needs to have queue semantics, otherwise tools that iterate it get confused record.status = OperationStatus.Queued; - console.log('queueing', record.weight, record.name); waitingIterators.shift()!({ value: record, done: false diff --git a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts index 1880f28961d..c33e5971542 100644 --- a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts +++ b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts @@ -44,7 +44,6 @@ function weightOperations( operation.weight = operationSettings.weight; } } - console.log('Operation weight:', operation.name, operation.weight); Async.validateWeightedIterable(operation); } return operations; From 530939d7b8cbae981cef02291e5f4d8c50b9f3de Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 8 May 2024 15:48:43 -0400 Subject: [PATCH 06/11] remove unnecessary test case --- .../node-core-library/src/test/Async.test.ts | 44 ------------------- 1 file changed, 44 deletions(-) diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index b090270a4b3..70fd6e7d967 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -539,50 +539,6 @@ describe(Async.name, () => { resolve2({ done: true, value: undefined }); await finalPromise; }); - - it('does not exceed the maxiumum concurrency for an async iterator when concurrency set to infinite', async () => { - let waitingIterators: number = 0; - - let resolve2!: (value: { done: true; value: undefined }) => void; - const signal2: Promise<{ done: true; value: undefined }> = new Promise((resolve, reject) => { - resolve2 = resolve; - }); - - let iteratorIndex: number = 0; - const asyncIterator: AsyncIterator<{ element: number; weight: number }> = { - next: () => { - iteratorIndex++; - if (iteratorIndex < 20) { - return Promise.resolve({ done: false, value: { element: iteratorIndex, weight: 2 } }); - } else { - ++waitingIterators; - return signal2; - } - } - }; - const asyncIterable: AsyncIterable<{ element: number; weight: number }> = { - [Symbol.asyncIterator]: () => asyncIterator - }; - - const finalPromise: Promise = Async.forEachAsync( - asyncIterable, - async (item) => { - // Do nothing - }, - { - concurrency: Number.POSITIVE_INFINITY, - weighted: true - } - ); - - // Wait for all the instant resolutions to be done - await Async.sleep(0); - - // The final iteration cycle is locked, so only 1 iterator is waiting. - expect(waitingIterators).toEqual(1); - resolve2({ done: true, value: undefined }); - await finalPromise; - }); }); describe(Async.runWithRetriesAsync.name, () => { From 0d66892004df20f66ffa850e31f3b8049483905c Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 8 May 2024 15:58:16 -0400 Subject: [PATCH 07/11] adjust comment --- libraries/node-core-library/src/Async.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 97b8478fcad..0d733f63c69 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -174,8 +174,9 @@ export class Async { !iteratorIsComplete && !promiseHasResolvedOrRejected ) { - // Increment the concurrency while waiting for the iterator. - // This function is reentrant, so this ensures that at most `concurrency` executions are waiting + // Increment the current concurrency units in progress by the concurrency limit before fetching the iterator weight. + // This function is reentrant, so this if concurrency is finite, at most 1 operation will be waiting. If it's infinite, + // there will be effectively no cap on the number of operations waiting. const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency; concurrentUnitsInProgress += limitedConcurrency; const currentIteratorResult: IteratorResult = await iterator.next(); @@ -185,14 +186,17 @@ export class Async { if (!iteratorIsComplete) { const currentIteratorValue: TEntry = currentIteratorResult.value; Async.validateWeightedIterable(currentIteratorValue); + // Cap the weight to concurrency, this allows 0 weight items to execute despite the concurrency limit. const weight: number = Math.min(currentIteratorValue.weight, concurrency); - // If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1. - // Cap it to the concurrency limit, otherwise higher weights can cause issues in the case where 0 weighted - // operations are present. + + // Remove the "lock" from the concurrency check and only apply the current weight. + // This should allow other operations to execute. concurrentUnitsInProgress += weight; concurrentUnitsInProgress -= limitedConcurrency; + Promise.resolve(callback(currentIteratorValue.element, arrayIndex++)) .then(async () => { + // Remove the operation completely from the in progress units. concurrentUnitsInProgress -= weight; await onOperationCompletionAsync(); }) From 7fb5a238e9583bc63c654c16a850e800dc4fa0b1 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 8 May 2024 19:40:29 -0400 Subject: [PATCH 08/11] fix cobuilds with operation weighting --- .../rush-lib/src/logic/operations/OperationExecutionManager.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts index db86ca8e194..c80a7b414bb 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts @@ -404,6 +404,9 @@ export class OperationExecutionManager { if (record.status !== OperationStatus.RemoteExecuting) { // If the operation was not remote, then we can notify queue that it is complete this._executionQueue.complete(record); + } else { + // Attempt to requeue other operations if the operation was remote + this._executionQueue.assignOperations(); } } } From 0d5e5ec071c516746f0d1ea251b3c5eb9592ff33 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 8 May 2024 19:57:55 -0400 Subject: [PATCH 09/11] add changeset --- ...ya-weighted-operation-reentry_2024-05-08-23-55.json | 10 ++++++++++ ...ya-weighted-operation-reentry_2024-05-08-23-55.json | 10 ++++++++++ 2 files changed, 20 insertions(+) create mode 100644 common/changes/@microsoft/rush/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json create mode 100644 common/changes/@rushstack/node-core-library/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json diff --git a/common/changes/@microsoft/rush/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json b/common/changes/@microsoft/rush/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json new file mode 100644 index 00000000000..4d2eed04db1 --- /dev/null +++ b/common/changes/@microsoft/rush/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@microsoft/rush", + "comment": "Fixes a bug in `OperationExecutionRecord` where operation weights were not correctly represented.", + "type": "none" + } + ], + "packageName": "@microsoft/rush" +} diff --git a/common/changes/@rushstack/node-core-library/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json b/common/changes/@rushstack/node-core-library/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json new file mode 100644 index 00000000000..931e3cb6bb4 --- /dev/null +++ b/common/changes/@rushstack/node-core-library/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@rushstack/node-core-library", + "comment": "Fixes a bug in `Async#forEachAsync` that caused it to not respect operation weight.", + "type": "patch" + } + ], + "packageName": "@rushstack/node-core-library" +} From 410e120801432e4bd3d3838c7ca10e828d785a9f Mon Sep 17 00:00:00 2001 From: Ian Clanton-Thuon Date: Thu, 9 May 2024 20:26:37 -0700 Subject: [PATCH 10/11] Update common/changes/@microsoft/rush/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json --- .../sennyeya-weighted-operation-reentry_2024-05-08-23-55.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/changes/@microsoft/rush/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json b/common/changes/@microsoft/rush/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json index 4d2eed04db1..28793703306 100644 --- a/common/changes/@microsoft/rush/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json +++ b/common/changes/@microsoft/rush/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json @@ -2,7 +2,7 @@ "changes": [ { "packageName": "@microsoft/rush", - "comment": "Fixes a bug in `OperationExecutionRecord` where operation weights were not correctly represented.", + "comment": "Fix an issue where operation weights were not respected.", "type": "none" } ], From 71d5e2413c92e3cfefe2afa7ad394308820ff94d Mon Sep 17 00:00:00 2001 From: Ian Clanton-Thuon Date: Thu, 9 May 2024 20:26:42 -0700 Subject: [PATCH 11/11] Update common/changes/@rushstack/node-core-library/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json --- .../sennyeya-weighted-operation-reentry_2024-05-08-23-55.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/changes/@rushstack/node-core-library/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json b/common/changes/@rushstack/node-core-library/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json index 931e3cb6bb4..b889f10baf8 100644 --- a/common/changes/@rushstack/node-core-library/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json +++ b/common/changes/@rushstack/node-core-library/sennyeya-weighted-operation-reentry_2024-05-08-23-55.json @@ -2,7 +2,7 @@ "changes": [ { "packageName": "@rushstack/node-core-library", - "comment": "Fixes a bug in `Async#forEachAsync` that caused it to not respect operation weight.", + "comment": "Fix a bug in `Async.forEachAsync` where weight wasn't respected.", "type": "patch" } ],