Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[node-core-library] iterator weighting isn't fully respected by Async#forEachAsync #4688

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@microsoft/rush",
"comment": "Fixes a bug in `OperationExecutionRecord` where operation weights were not correctly represented.",
iclanton marked this conversation as resolved.
Show resolved Hide resolved
"type": "none"
}
],
"packageName": "@microsoft/rush"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@rushstack/node-core-library",
"comment": "Fixes a bug in `Async#forEachAsync` that caused it to not respect operation weight.",
iclanton marked this conversation as resolved.
Show resolved Hide resolved
"type": "patch"
}
],
"packageName": "@rushstack/node-core-library"
}
31 changes: 18 additions & 13 deletions libraries/node-core-library/src/Async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ export class Async {
}

private static async _forEachWeightedAsync<TReturn, TEntry extends { weight: number; element: TReturn }>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
iterable: AsyncIterable<TEntry>,
callback: (entry: TReturn, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions | undefined
): Promise<void> {
Expand All @@ -160,10 +160,9 @@ export class Async {
options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity;
let concurrentUnitsInProgress: number = 0;

const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
(iterable as AsyncIterable<TEntry>)[Symbol.asyncIterator]
).call(iterable);
const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (iterable as AsyncIterable<TEntry>)[
Symbol.asyncIterator
].call(iterable);

let arrayIndex: number = 0;
let iteratorIsComplete: boolean = false;
Expand All @@ -175,23 +174,29 @@ export class Async {
!iteratorIsComplete &&
!promiseHasResolvedOrRejected
) {
// Increment the concurrency while waiting for the iterator.
// This function is reentrant, so this ensures that at most `concurrency` executions are waiting
concurrentUnitsInProgress++;
// Increment the current concurrency units in progress by the concurrency limit before fetching the iterator weight.
// This function is reentrant, so this if concurrency is finite, at most 1 operation will be waiting. If it's infinite,
// there will be effectively no cap on the number of operations waiting.
const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency;
concurrentUnitsInProgress += limitedConcurrency;
const currentIteratorResult: IteratorResult<TEntry> = await iterator.next();
// eslint-disable-next-line require-atomic-updates
iteratorIsComplete = !!currentIteratorResult.done;

if (!iteratorIsComplete) {
const currentIteratorValue: TEntry = currentIteratorResult.value;
Async.validateWeightedIterable(currentIteratorValue);
// Cap the weight to concurrency, this allows 0 weight items to execute despite the concurrency limit.
const weight: number = Math.min(currentIteratorValue.weight, concurrency);
// If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1.
// Cap it to the concurrency limit, otherwise higher weights can cause issues in the case where 0 weighted
// operations are present.
concurrentUnitsInProgress += weight - 1;

// Remove the "lock" from the concurrency check and only apply the current weight.
// This should allow other operations to execute.
concurrentUnitsInProgress += weight;
concurrentUnitsInProgress -= limitedConcurrency;

Promise.resolve(callback(currentIteratorValue.element, arrayIndex++))
.then(async () => {
// Remove the operation completely from the in progress units.
concurrentUnitsInProgress -= weight;
await onOperationCompletionAsync();
})
Expand All @@ -201,7 +206,7 @@ export class Async {
});
} else {
// The iterator is complete and there wasn't a value, so untrack the waiting state.
concurrentUnitsInProgress--;
concurrentUnitsInProgress -= limitedConcurrency;
}
}

Expand Down
51 changes: 48 additions & 3 deletions libraries/node-core-library/src/test/Async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,20 +273,21 @@ describe(Async.name, () => {
[Symbol.asyncIterator]: () => asyncIterator
};

const expectedConcurrency: 4 = 4;
const finalPromise: Promise<void> = Async.forEachAsync(
asyncIterable,
async (item) => {
// Do nothing
},
{
concurrency: expectedConcurrency
concurrency: 4
}
);

// Wait for all the instant resolutions to be done
await Async.sleep(0);
expect(waitingIterators).toEqual(expectedConcurrency);

// The final iteration cycle is locked, so only 1 iterator is waiting.
expect(waitingIterators).toEqual(1);
resolve2({ done: true, value: undefined });
await finalPromise;
});
Expand Down Expand Up @@ -494,6 +495,50 @@ describe(Async.name, () => {
expect(fn).toHaveBeenCalledTimes(10);
expect(maxRunning).toEqual(9);
});

it('does not exceed the maxiumum concurrency for an async iterator when weighted', async () => {
let waitingIterators: number = 0;

let resolve2!: (value: { done: true; value: undefined }) => void;
const signal2: Promise<{ done: true; value: undefined }> = new Promise((resolve, reject) => {
resolve2 = resolve;
});

let iteratorIndex: number = 0;
const asyncIterator: AsyncIterator<{ element: number; weight: number }> = {
next: () => {
iteratorIndex++;
if (iteratorIndex < 20) {
return Promise.resolve({ done: false, value: { element: iteratorIndex, weight: 2 } });
} else {
++waitingIterators;
return signal2;
}
}
};
const asyncIterable: AsyncIterable<{ element: number; weight: number }> = {
[Symbol.asyncIterator]: () => asyncIterator
};

const finalPromise: Promise<void> = Async.forEachAsync(
asyncIterable,
async (item) => {
// Do nothing
},
{
concurrency: 4,
weighted: true
}
);

// Wait for all the instant resolutions to be done
await Async.sleep(0);

// The final iteration cycle is locked, so only 1 iterator is waiting.
expect(waitingIterators).toEqual(1);
resolve2({ done: true, value: undefined });
await finalPromise;
});
});

describe(Async.runWithRetriesAsync.name, () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ export class AsyncOperationQueue
}
}

this.assignOperations();

if (this._completedOperations.size === this._totalOperations) {
this._isDone = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,9 @@ export class OperationExecutionManager {
if (record.status !== OperationStatus.RemoteExecuting) {
// If the operation was not remote, then we can notify queue that it is complete
this._executionQueue.complete(record);
} else {
// Attempt to requeue other operations if the operation was remote
this._executionQueue.assignOperations();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
public readonly stdioSummarizer: StdioSummarizer = new StdioSummarizer();

public readonly runner: IOperationRunner;
public readonly weight: number;
public readonly associatedPhase: IPhase | undefined;
public readonly associatedProject: RushConfigurationProject | undefined;
public readonly _operationMetadataManager: OperationMetadataManager | undefined;
Expand All @@ -117,7 +116,6 @@ export class OperationExecutionRecord implements IOperationRunnerContext {

this.operation = operation;
this.runner = runner;
this.weight = operation.weight;
this.associatedPhase = associatedPhase;
this.associatedProject = associatedProject;
if (operation.associatedPhase && operation.associatedProject) {
Expand All @@ -134,6 +132,10 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
return this.runner.name;
}

public get weight(): number {
return this.operation.weight;
}

public get debugMode(): boolean {
return this._context.debugMode;
}
Expand Down
Loading