Skip to content

Commit

Permalink
Merge pull request #4688 from aramissennyeydd/sennyeya/weighted-opera…
Browse files Browse the repository at this point in the history
…tion-reentry

fix(node-core-library): iterator weighting isn't fully respected by `Async#forEachAsync`
  • Loading branch information
iclanton authored May 10, 2024
2 parents 890322a + 71d5e24 commit b7704b4
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@microsoft/rush",
"comment": "Fix an issue where operation weights were not respected.",
"type": "none"
}
],
"packageName": "@microsoft/rush"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@rushstack/node-core-library",
"comment": "Fix a bug in `Async.forEachAsync` where weight wasn't respected.",
"type": "patch"
}
],
"packageName": "@rushstack/node-core-library"
}
31 changes: 18 additions & 13 deletions libraries/node-core-library/src/Async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ export class Async {
}

private static async _forEachWeightedAsync<TReturn, TEntry extends { weight: number; element: TReturn }>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
iterable: AsyncIterable<TEntry>,
callback: (entry: TReturn, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions | undefined
): Promise<void> {
Expand All @@ -160,10 +160,9 @@ export class Async {
options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity;
let concurrentUnitsInProgress: number = 0;

const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
(iterable as AsyncIterable<TEntry>)[Symbol.asyncIterator]
).call(iterable);
const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (iterable as AsyncIterable<TEntry>)[
Symbol.asyncIterator
].call(iterable);

let arrayIndex: number = 0;
let iteratorIsComplete: boolean = false;
Expand All @@ -175,23 +174,29 @@ export class Async {
!iteratorIsComplete &&
!promiseHasResolvedOrRejected
) {
// Increment the concurrency while waiting for the iterator.
// This function is reentrant, so this ensures that at most `concurrency` executions are waiting
concurrentUnitsInProgress++;
// Increment the current concurrency units in progress by the concurrency limit before fetching the iterator weight.
// This function is reentrant, so this if concurrency is finite, at most 1 operation will be waiting. If it's infinite,
// there will be effectively no cap on the number of operations waiting.
const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency;
concurrentUnitsInProgress += limitedConcurrency;
const currentIteratorResult: IteratorResult<TEntry> = await iterator.next();
// eslint-disable-next-line require-atomic-updates
iteratorIsComplete = !!currentIteratorResult.done;

if (!iteratorIsComplete) {
const currentIteratorValue: TEntry = currentIteratorResult.value;
Async.validateWeightedIterable(currentIteratorValue);
// Cap the weight to concurrency, this allows 0 weight items to execute despite the concurrency limit.
const weight: number = Math.min(currentIteratorValue.weight, concurrency);
// If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1.
// Cap it to the concurrency limit, otherwise higher weights can cause issues in the case where 0 weighted
// operations are present.
concurrentUnitsInProgress += weight - 1;

// Remove the "lock" from the concurrency check and only apply the current weight.
// This should allow other operations to execute.
concurrentUnitsInProgress += weight;
concurrentUnitsInProgress -= limitedConcurrency;

Promise.resolve(callback(currentIteratorValue.element, arrayIndex++))
.then(async () => {
// Remove the operation completely from the in progress units.
concurrentUnitsInProgress -= weight;
await onOperationCompletionAsync();
})
Expand All @@ -201,7 +206,7 @@ export class Async {
});
} else {
// The iterator is complete and there wasn't a value, so untrack the waiting state.
concurrentUnitsInProgress--;
concurrentUnitsInProgress -= limitedConcurrency;
}
}

Expand Down
51 changes: 48 additions & 3 deletions libraries/node-core-library/src/test/Async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,20 +273,21 @@ describe(Async.name, () => {
[Symbol.asyncIterator]: () => asyncIterator
};

const expectedConcurrency: 4 = 4;
const finalPromise: Promise<void> = Async.forEachAsync(
asyncIterable,
async (item) => {
// Do nothing
},
{
concurrency: expectedConcurrency
concurrency: 4
}
);

// Wait for all the instant resolutions to be done
await Async.sleep(0);
expect(waitingIterators).toEqual(expectedConcurrency);

// The final iteration cycle is locked, so only 1 iterator is waiting.
expect(waitingIterators).toEqual(1);
resolve2({ done: true, value: undefined });
await finalPromise;
});
Expand Down Expand Up @@ -494,6 +495,50 @@ describe(Async.name, () => {
expect(fn).toHaveBeenCalledTimes(10);
expect(maxRunning).toEqual(9);
});

it('does not exceed the maxiumum concurrency for an async iterator when weighted', async () => {
let waitingIterators: number = 0;

let resolve2!: (value: { done: true; value: undefined }) => void;
const signal2: Promise<{ done: true; value: undefined }> = new Promise((resolve, reject) => {
resolve2 = resolve;
});

let iteratorIndex: number = 0;
const asyncIterator: AsyncIterator<{ element: number; weight: number }> = {
next: () => {
iteratorIndex++;
if (iteratorIndex < 20) {
return Promise.resolve({ done: false, value: { element: iteratorIndex, weight: 2 } });
} else {
++waitingIterators;
return signal2;
}
}
};
const asyncIterable: AsyncIterable<{ element: number; weight: number }> = {
[Symbol.asyncIterator]: () => asyncIterator
};

const finalPromise: Promise<void> = Async.forEachAsync(
asyncIterable,
async (item) => {
// Do nothing
},
{
concurrency: 4,
weighted: true
}
);

// Wait for all the instant resolutions to be done
await Async.sleep(0);

// The final iteration cycle is locked, so only 1 iterator is waiting.
expect(waitingIterators).toEqual(1);
resolve2({ done: true, value: undefined });
await finalPromise;
});
});

describe(Async.runWithRetriesAsync.name, () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ export class AsyncOperationQueue
}
}

this.assignOperations();

if (this._completedOperations.size === this._totalOperations) {
this._isDone = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ export class OperationExecutionManager {
if (record.status !== OperationStatus.RemoteExecuting) {
// If the operation was not remote, then we can notify queue that it is complete
this._executionQueue.complete(record);
} else {
// Attempt to requeue other operations if the operation was remote
this._executionQueue.assignOperations();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
public readonly stdioSummarizer: StdioSummarizer = new StdioSummarizer();

public readonly runner: IOperationRunner;
public readonly weight: number;
public readonly associatedPhase: IPhase | undefined;
public readonly associatedProject: RushConfigurationProject | undefined;
public readonly _operationMetadataManager: OperationMetadataManager | undefined;
Expand All @@ -117,7 +116,6 @@ export class OperationExecutionRecord implements IOperationRunnerContext {

this.operation = operation;
this.runner = runner;
this.weight = operation.weight;
this.associatedPhase = associatedPhase;
this.associatedProject = associatedProject;
if (operation.associatedPhase && operation.associatedProject) {
Expand All @@ -134,6 +132,10 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
return this.runner.name;
}

public get weight(): number {
return this.operation.weight;
}

public get debugMode(): boolean {
return this._context.debugMode;
}
Expand Down

0 comments on commit b7704b4

Please sign in to comment.