Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rush,node-core-library): allow weighted async concurrency #4672

Merged
merged 28 commits into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
919dff6
start working on weighted graph
aramissennyeydd Apr 29, 2024
ea5bae4
adds a new forEachWeightedAsync method to Async that uses operation w…
aramissennyeydd Apr 30, 2024
f16e6a9
add test cases for async weighted
aramissennyeydd May 1, 2024
cc5b1df
add a test case for weight 0
aramissennyeydd May 1, 2024
8ed8fb8
add a header comment
aramissennyeydd May 1, 2024
dbc8402
fix-api-report
aramissennyeydd May 1, 2024
0a1ebc5
fix weightedoperationplugin
aramissennyeydd May 1, 2024
fed6a23
add changesets
aramissennyeydd May 1, 2024
f935016
fix linting
aramissennyeydd May 2, 2024
06c7679
move the weighting behavior into an overload
aramissennyeydd May 2, 2024
32ad704
Apply suggestions from code review
aramissennyeydd May 2, 2024
1ac2b95
update changeset
aramissennyeydd May 2, 2024
75701f7
remove unnecessary tsdoc things
aramissennyeydd May 2, 2024
56a70a6
make weight required
aramissennyeydd May 2, 2024
a04fe82
fix api report
aramissennyeydd May 2, 2024
86ae47a
only use weights when weighted is set to true
aramissennyeydd May 2, 2024
63857ef
add a test for weighted being disabled
aramissennyeydd May 2, 2024
3406734
moving the comment to the public function
aramissennyeydd May 2, 2024
80dae8f
fix linting concern
aramissennyeydd May 2, 2024
ca1ffc6
handle larger than concurrency weights
aramissennyeydd May 2, 2024
cc2a498
Apply suggestions from code review
aramissennyeydd May 3, 2024
a4bbbff
Update libraries/rush-lib/src/schemas/rush-project.schema.json
aramissennyeydd May 3, 2024
6163ca9
address code review questions
aramissennyeydd May 3, 2024
343ef0e
add documentation for weighted
aramissennyeydd May 3, 2024
27ce6a1
fix api report
aramissennyeydd May 3, 2024
84d8ce8
add weights for map too
aramissennyeydd May 3, 2024
0598f0d
fix api report
aramissennyeydd May 3, 2024
a526067
Apply suggestions from code review
aramissennyeydd May 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@microsoft/rush",
"comment": "Add a `\"weight\"` property to the `\"operation\"` object in the project `config/rush-project.json` file that defines an integer weight for how much of the allowed parallelism the operation uses.",
"type": "none"
}
],
"packageName": "@microsoft/rush"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@rushstack/node-core-library",
"comment": "Add a new `weighted: true` option to the `Async.forEachAsync` method that allows each element to specify how much of the allowed parallelism the callback uses.",
"type": "minor"
}
],
"packageName": "@rushstack/node-core-library"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@rushstack/node-core-library",
"comment": "Add a new `weighted: true` option to the `Async.mapAsync` method that allows each element to specify how much of the allowed parallelism the callback uses.",
"type": "patch"
}
],
"packageName": "@rushstack/node-core-library"
}
22 changes: 20 additions & 2 deletions common/reviews/api/node-core-library.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,23 @@ export class AlreadyReportedError extends Error {

// @public
export class Async {
static forEachAsync<TEntry>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options?: IAsyncParallelismOptions | undefined): Promise<void>;
static forEachAsync<TEntry>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options?: (IAsyncParallelismOptions & {
weighted?: false;
}) | undefined): Promise<void>;
static forEachAsync<TEntry extends IWeighted>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options: IAsyncParallelismOptions & {
weighted: true;
}): Promise<void>;
static getSignal(): [Promise<void>, () => void, (err: Error) => void];
static mapAsync<TEntry, TRetVal>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>, options?: IAsyncParallelismOptions | undefined): Promise<TRetVal[]>;
static mapAsync<TEntry, TRetVal>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>, options?: (IAsyncParallelismOptions & {
weighted?: false;
}) | undefined): Promise<TRetVal[]>;
static mapAsync<TEntry extends IWeighted, TRetVal>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>, options: IAsyncParallelismOptions & {
weighted: true;
}): Promise<TRetVal[]>;
static runWithRetriesAsync<TResult>({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions<TResult>): Promise<TResult>;
static sleep(ms: number): Promise<void>;
// (undocumented)
static validateWeightedIterable(operation: IWeighted): void;
aramissennyeydd marked this conversation as resolved.
Show resolved Hide resolved
}

// @public
Expand Down Expand Up @@ -225,6 +237,7 @@ export type FolderItem = fs.Dirent;
// @public
export interface IAsyncParallelismOptions {
concurrency?: number;
weighted?: boolean;
}

// @public
Expand Down Expand Up @@ -602,6 +615,11 @@ export interface IWaitForExitWithStringOptions extends IWaitForExitOptions {
encoding: BufferEncoding;
}

// @public (undocumented)
export interface IWeighted {
weight: number;
}

// @public
export class JsonFile {
// @internal (undocumented)
Expand Down
1 change: 1 addition & 0 deletions common/reviews/api/rush-lib.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ export interface IOperationSettings {
disableBuildCacheForOperation?: boolean;
operationName: string;
outputFolderNames?: string[];
weight?: number;
}

// @internal (undocumented)
Expand Down
201 changes: 171 additions & 30 deletions libraries/node-core-library/src/Async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@
* Options for controlling the parallelism of asynchronous operations.
*
* @remarks
* Used with {@link Async.mapAsync} and {@link Async.forEachAsync}.
* Used with {@link (Async:class).(mapAsync:1)}, {@link (Async:class).(mapAsync:2)} and
* {@link (Async:class).(forEachAsync:1)}, and {@link (Async:class).(forEachAsync:2)}.
*
* @public
*/
export interface IAsyncParallelismOptions {
/**
* Optionally used with the {@link Async.mapAsync} and {@link Async.forEachAsync}
* to limit the maximum number of concurrent promises to the specified number.
* Optionally used with the {@link (Async:class).(mapAsync:1)}, {@link (Async:class).(mapAsync:2)} and
* {@link (Async:class).(forEachAsync:1)}, and {@link (Async:class).(forEachAsync:2)} to limit the maximum
* number of concurrent promises to the specified number.
*/
concurrency?: number;

/**
* Optionally used with the {@link (Async:class).(forEachAsync:2)} to enable weighted operations where an operation can
* take up more or less than one concurrency unit.
*/
weighted?: boolean;
}

/**
Expand All @@ -29,6 +37,42 @@ export interface IRunWithRetriesOptions<TResult> {
retryDelayMs?: number;
}

/**
* @remarks
* Used with {@link (Async:class).(forEachAsync:2)}.
aramissennyeydd marked this conversation as resolved.
Show resolved Hide resolved
*
* @public
*/
export interface IWeighted {
/**
* The weight of the element, used to determine the concurrency units that it will take up.
* Must be a whole number greater than or equal to 0.
iclanton marked this conversation as resolved.
Show resolved Hide resolved
*/
weight: number;
}

function toWeightedIterator<TEntry>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
useWeights?: boolean
): AsyncIterable<{ element: TEntry; weight: number }> {
const iterator: Iterator<TEntry> | AsyncIterator<TEntry, TEntry> = (
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
(iterable as AsyncIterable<TEntry>)[Symbol.asyncIterator]
).call(iterable);
return {
[Symbol.asyncIterator]: () => ({
next: async () => {
// The await is necessary here, but TS will complain - it's a false positive.
const { value, done } = await iterator.next();
return {
value: { element: value, weight: useWeights ? value?.weight : 1 },
done: !!done
};
}
})
};
}

/**
* Utilities for parallel asynchronous operations, for use with the system `Promise` APIs.
*
Expand Down Expand Up @@ -58,29 +102,18 @@ export class Async {
public static async mapAsync<TEntry, TRetVal>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>,
options?: IAsyncParallelismOptions | undefined
): Promise<TRetVal[]> {
const result: TRetVal[] = [];

await Async.forEachAsync(
iterable,
async (item: TEntry, arrayIndex: number): Promise<void> => {
result[arrayIndex] = await callback(item, arrayIndex);
},
options
);

return result;
}
options?: (IAsyncParallelismOptions & { weighted?: false }) | undefined
): Promise<TRetVal[]>;

/**
* Given an input array and a `callback` function, invoke the callback to start a
* promise for each element in the array.
* promise for each element in the array. Returns an array containing the results.
*
* @remarks
* This API is similar to the system `Array#forEach`, except that the loop is asynchronous,
* and the maximum number of concurrent promises can be throttled
* using {@link IAsyncParallelismOptions.concurrency}.
* This API is similar to the system `Array#map`, except that the loop is asynchronous,
* and the maximum number of concurrent units can be throttled
* using {@link IAsyncParallelismOptions.concurrency}. Using the {@link IAsyncParallelismOptions.weighted}
* option, the weight of each operation can be specified, which determines how many concurrent units it takes up.
*
* If `callback` throws a synchronous exception, or if it returns a promise that rejects,
* then the loop stops immediately. Any remaining array items will be skipped, and
Expand All @@ -90,16 +123,42 @@ export class Async {
* @param callback - a function that starts an asynchronous promise for an element
* from the array
* @param options - options for customizing the control flow
* @returns an array containing the result for each callback, in the same order
* as the original input `array`
*/
public static async forEachAsync<TEntry>(
public static async mapAsync<TEntry extends IWeighted, TRetVal>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>,
options: IAsyncParallelismOptions & { weighted: true }
): Promise<TRetVal[]>;
public static async mapAsync<TEntry, TRetVal>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>,
options?: IAsyncParallelismOptions | undefined
): Promise<TRetVal[]> {
const result: TRetVal[] = [];

// @ts-expect-error https://github.com/microsoft/TypeScript/issues/22609, it succeeds against the implementation but fails against the overloads
await Async.forEachAsync(
iterable,
async (item: TEntry, arrayIndex: number): Promise<void> => {
result[arrayIndex] = await callback(item, arrayIndex);
},
options
);

return result;
}

private static async _forEachWeightedAsync<TReturn, TEntry extends { weight: number; element: TReturn }>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TReturn, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions | undefined
): Promise<void> {
await new Promise<void>((resolve: () => void, reject: (error: Error) => void) => {
const concurrency: number =
options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity;
let operationsInProgress: number = 0;
let concurrentUnitsInProgress: number = 0;

const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
Expand All @@ -111,18 +170,29 @@ export class Async {
let promiseHasResolvedOrRejected: boolean = false;

async function queueOperationsAsync(): Promise<void> {
while (operationsInProgress < concurrency && !iteratorIsComplete && !promiseHasResolvedOrRejected) {
while (
concurrentUnitsInProgress < concurrency &&
!iteratorIsComplete &&
!promiseHasResolvedOrRejected
) {
// Increment the concurrency while waiting for the iterator.
// This function is reentrant, so this ensures that at most `concurrency` executions are waiting
operationsInProgress++;
concurrentUnitsInProgress++;
const currentIteratorResult: IteratorResult<TEntry> = await iterator.next();
// eslint-disable-next-line require-atomic-updates
iteratorIsComplete = !!currentIteratorResult.done;

if (!iteratorIsComplete) {
Promise.resolve(callback(currentIteratorResult.value, arrayIndex++))
const currentIteratorValue: TEntry = currentIteratorResult.value;
Async.validateWeightedIterable(currentIteratorValue);
const weight: number = Math.min(currentIteratorValue.weight, concurrency);
// If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1.
// Cap it to the concurrency limit, otherwise higher weights can cause issues in the case where 0 weighted
// operations are present.
concurrentUnitsInProgress += weight - 1;
Promise.resolve(callback(currentIteratorValue.element, arrayIndex++))
.then(async () => {
operationsInProgress--;
concurrentUnitsInProgress -= weight;
await onOperationCompletionAsync();
})
.catch((error) => {
Expand All @@ -131,7 +201,7 @@ export class Async {
});
} else {
// The iterator is complete and there wasn't a value, so untrack the waiting state.
operationsInProgress--;
concurrentUnitsInProgress--;
}
}

Expand All @@ -142,7 +212,7 @@ export class Async {

async function onOperationCompletionAsync(): Promise<void> {
if (!promiseHasResolvedOrRejected) {
if (operationsInProgress === 0 && iteratorIsComplete) {
if (concurrentUnitsInProgress === 0 && iteratorIsComplete) {
promiseHasResolvedOrRejected = true;
resolve();
} else if (!iteratorIsComplete) {
Expand All @@ -158,6 +228,68 @@ export class Async {
});
}

/**
* Given an input array and a `callback` function, invoke the callback to start a
* promise for each element in the array.
*
* @remarks
* This API is similar to the system `Array#forEach`, except that the loop is asynchronous,
* and the maximum number of concurrent promises can be throttled
* using {@link IAsyncParallelismOptions.concurrency}.
*
* If `callback` throws a synchronous exception, or if it returns a promise that rejects,
* then the loop stops immediately. Any remaining array items will be skipped, and
* overall operation will reject with the first error that was encountered.
*
* @param iterable - the array of inputs for the callback function
* @param callback - a function that starts an asynchronous promise for an element
* from the array
* @param options - options for customizing the control flow
*/
public static async forEachAsync<TEntry>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
options?: (IAsyncParallelismOptions & { weighted?: false }) | undefined
): Promise<void>;

/**
* Given an input array and a `callback` function, invoke the callback to start a
* promise for each element in the array.
*
* @remarks
* This API is similar to the other `Array#forEachAsync`, except that each item can have
* a weight that determines how many concurrent operations are allowed. The unweighted
* `Array#forEachAsync` is a special case of this method where weight = 1 for all items.
*
* The maximum number of concurrent operations can still be throttled using
* {@link IAsyncParallelismOptions.concurrency}, however it no longer determines the
* maximum number of operations that can be in progress at once. Instead, it determines the
* number of concurrency units that can be in progress at once. The weight of each operation
* determines how many concurrency units it takes up. For example, if the concurrency is 2
* and the first operation has a weight of 2, then only one more operation can be in progress.
*
* If `callback` throws a synchronous exception, or if it returns a promise that rejects,
* then the loop stops immediately. Any remaining array items will be skipped, and
* overall operation will reject with the first error that was encountered.
*
* @param iterable - the array of inputs for the callback function
* @param callback - a function that starts an asynchronous promise for an element
* from the array
* @param options - options for customizing the control flow
*/
public static async forEachAsync<TEntry extends IWeighted>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
options: IAsyncParallelismOptions & { weighted: true }
): Promise<void>;
public static async forEachAsync<TEntry>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions
): Promise<void> {
await Async._forEachWeightedAsync(toWeightedIterator(iterable, options?.weighted), callback, options);
}

/**
* Return a promise that resolves after the specified number of milliseconds.
*/
Expand Down Expand Up @@ -190,6 +322,15 @@ export class Async {
}
}

public static validateWeightedIterable(operation: IWeighted): void {
aramissennyeydd marked this conversation as resolved.
Show resolved Hide resolved
if (operation.weight < 0) {
throw new Error('Weight must be a whole number greater than or equal to 0');
}
if (operation.weight % 1 !== 0) {
throw new Error('Weight must be a whole number greater than or equal to 0');
}
}

/**
* Returns a Signal, a.k.a. a "deferred promise".
*/
Expand Down
2 changes: 1 addition & 1 deletion libraries/node-core-library/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

export { AlreadyReportedError } from './AlreadyReportedError';
export { Async, AsyncQueue, IAsyncParallelismOptions, IRunWithRetriesOptions } from './Async';
export { Async, AsyncQueue, IAsyncParallelismOptions, IRunWithRetriesOptions, IWeighted } from './Async';
export { Brand } from './PrimitiveTypes';
export { FileConstants, FolderConstants } from './Constants';
export { Enum } from './Enum';
Expand Down
Loading
Loading