Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rush,node-core-library): allow weighted async concurrency #4672

Merged
merged 28 commits into from
May 5, 2024
Merged
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
919dff6
start working on weighted graph
aramissennyeydd Apr 29, 2024
ea5bae4
adds a new forEachWeightedAsync method to Async that uses operation w…
aramissennyeydd Apr 30, 2024
f16e6a9
add test cases for async weighted
aramissennyeydd May 1, 2024
cc5b1df
add a test case for weight 0
aramissennyeydd May 1, 2024
8ed8fb8
add a header comment
aramissennyeydd May 1, 2024
dbc8402
fix-api-report
aramissennyeydd May 1, 2024
0a1ebc5
fix weightedoperationplugin
aramissennyeydd May 1, 2024
fed6a23
add changesets
aramissennyeydd May 1, 2024
f935016
fix linting
aramissennyeydd May 2, 2024
06c7679
move the weighting behavior into an overload
aramissennyeydd May 2, 2024
32ad704
Apply suggestions from code review
aramissennyeydd May 2, 2024
1ac2b95
update changeset
aramissennyeydd May 2, 2024
75701f7
remove unnecessary tsdoc things
aramissennyeydd May 2, 2024
56a70a6
make weight required
aramissennyeydd May 2, 2024
a04fe82
fix api report
aramissennyeydd May 2, 2024
86ae47a
only use weights when weighted is set to true
aramissennyeydd May 2, 2024
63857ef
add a test for weighted being disabled
aramissennyeydd May 2, 2024
3406734
moving the comment to the public function
aramissennyeydd May 2, 2024
80dae8f
fix linting concern
aramissennyeydd May 2, 2024
ca1ffc6
handle larger than concurrency weights
aramissennyeydd May 2, 2024
cc2a498
Apply suggestions from code review
aramissennyeydd May 3, 2024
a4bbbff
Update libraries/rush-lib/src/schemas/rush-project.schema.json
aramissennyeydd May 3, 2024
6163ca9
address code review questions
aramissennyeydd May 3, 2024
343ef0e
add documentation for weighted
aramissennyeydd May 3, 2024
27ce6a1
fix api report
aramissennyeydd May 3, 2024
84d8ce8
add weights for map too
aramissennyeydd May 3, 2024
0598f0d
fix api report
aramissennyeydd May 3, 2024
a526067
Apply suggestions from code review
aramissennyeydd May 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@microsoft/rush",
"comment": "Add a `\"weight\"` property to the `\"operation\"` object in the project `config/rush-project.json` file that defines an integer weight for how much of the allowed parallelism the operation uses.",
"type": "none"
}
],
"packageName": "@microsoft/rush"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@rushstack/node-core-library",
"comment": "Add a new `weighted: true` option to the `Async.forEachAsync` method that allows each element to specify how much of the allowed parallelism the callback uses.",
"type": "minor"
}
],
"packageName": "@rushstack/node-core-library"
}
12 changes: 11 additions & 1 deletion common/reviews/api/node-core-library.api.md
Original file line number Diff line number Diff line change
@@ -25,7 +25,11 @@ export class AlreadyReportedError extends Error {

// @public
export class Async {
static forEachAsync<TEntry>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options?: IAsyncParallelismOptions | undefined): Promise<void>;
static forEachAsync<TEntry>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options?: IAsyncParallelismOptions): Promise<void>;
// (undocumented)
static forEachAsync<TEntry extends IWeightedIterable>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options: IAsyncParallelismOptions & {
weighted: true;
}): Promise<void>;
static getSignal(): [Promise<void>, () => void, (err: Error) => void];
static mapAsync<TEntry, TRetVal>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>, options?: IAsyncParallelismOptions | undefined): Promise<TRetVal[]>;
static runWithRetriesAsync<TResult>({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions<TResult>): Promise<TResult>;
@@ -602,6 +606,12 @@ export interface IWaitForExitWithStringOptions extends IWaitForExitOptions {
encoding: BufferEncoding;
}

// @public (undocumented)
export interface IWeightedIterable {
aramissennyeydd marked this conversation as resolved.
Show resolved Hide resolved
// (undocumented)
weight?: number;
aramissennyeydd marked this conversation as resolved.
Show resolved Hide resolved
}

// @public
export class JsonFile {
// @internal (undocumented)
1 change: 1 addition & 0 deletions common/reviews/api/rush-lib.api.md
Original file line number Diff line number Diff line change
@@ -619,6 +619,7 @@ export interface IOperationSettings {
disableBuildCacheForOperation?: boolean;
operationName: string;
outputFolderNames?: string[];
weight?: number;
}

// @internal (undocumented)
122 changes: 107 additions & 15 deletions libraries/node-core-library/src/Async.ts
Original file line number Diff line number Diff line change
@@ -5,14 +5,14 @@
* Options for controlling the parallelism of asynchronous operations.
*
* @remarks
* Used with {@link Async.mapAsync} and {@link Async.forEachAsync}.
* Used with {@link Async.mapAsync} and {@link (Async:class).(forEachAsync:2)}.
*
* @public
*/
export interface IAsyncParallelismOptions {
/**
* Optionally used with the {@link Async.mapAsync} and {@link Async.forEachAsync}
* to limit the maximum number of concurrent promises to the specified number.
* Optionally used with the {@link Async.mapAsync}, {@link (Async:class).(forEachAsync:1)}, and
* {@link (Async:class).(forEachAsync:2)} to limit the maximum number of concurrent promises to the specified number.
*/
concurrency?: number;
}
@@ -29,6 +29,43 @@ export interface IRunWithRetriesOptions<TResult> {
retryDelayMs?: number;
}

/**
* @remarks
* Used with {@link (Async:class).(forEachAsync:2)}.
aramissennyeydd marked this conversation as resolved.
Show resolved Hide resolved
*
* @public
*/
export interface IWeightedIterable {
weight?: number;
}

function getWeight<T>(element: T): number | undefined {
if (typeof element === 'object' && element !== null) {
return 'weight' in element ? (element.weight as number) : undefined;
}
return undefined;
aramissennyeydd marked this conversation as resolved.
Show resolved Hide resolved
}

function toWeightedIterator<TEntry>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>
): AsyncIterable<{ element: TEntry; weighted?: number }> {
const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
(iterable as AsyncIterable<TEntry>)[Symbol.asyncIterator]
).call(iterable);
return {
[Symbol.asyncIterator]: () => ({
next: async () => {
const { value, done } = await Promise.resolve(iterator.next());
aramissennyeydd marked this conversation as resolved.
Show resolved Hide resolved
return {
value: { element: value, weight: getWeight(value) },
done: done ?? false
};
}
})
};
}

/**
* Utilities for parallel asynchronous operations, for use with the system `Promise` APIs.
*
@@ -78,9 +115,16 @@ export class Async {
* promise for each element in the array.
*
* @remarks
* This API is similar to the system `Array#forEach`, except that the loop is asynchronous,
* and the maximum number of concurrent promises can be throttled
* using {@link IAsyncParallelismOptions.concurrency}.
* This API is similar to the system `Array#forEachAsync`, except that each item can have
* a weight that determines how many concurrent operations are allowed. `Array#forEachAsync`
* is a special case of this method where weight = 1 for all items.
*
* The maximum number of concurrent operations can still be throttled using
* {@link IAsyncParallelismOptions.concurrency}, however it no longer determines the
* maximum number of operations that can be in progress at once. Instead, it determines the
* number of concurrency units that can be in progress at once. The weight of each operation
* determines how many concurrency units it takes up. For example, if the concurrency is 2
* and the first operation has a weight of 2, then only one more operation can be in progress.
*
* If `callback` throws a synchronous exception, or if it returns a promise that rejects,
* then the loop stops immediately. Any remaining array items will be skipped, and
@@ -91,15 +135,15 @@ export class Async {
* from the array
* @param options - options for customizing the control flow
*/
public static async forEachAsync<TEntry>(
private static async _forEachWeightedAsync<TReturn, TEntry extends { weight?: number; element: TReturn }>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
callback: (entry: TReturn, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions | undefined
): Promise<void> {
await new Promise<void>((resolve: () => void, reject: (error: Error) => void) => {
const concurrency: number =
options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity;
let operationsInProgress: number = 0;
let concurrentUnitsInProgress: number = 0;

const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
@@ -111,18 +155,26 @@ export class Async {
let promiseHasResolvedOrRejected: boolean = false;

async function queueOperationsAsync(): Promise<void> {
while (operationsInProgress < concurrency && !iteratorIsComplete && !promiseHasResolvedOrRejected) {
while (
concurrentUnitsInProgress < concurrency &&
!iteratorIsComplete &&
!promiseHasResolvedOrRejected
) {
// Increment the concurrency while waiting for the iterator.
// This function is reentrant, so this ensures that at most `concurrency` executions are waiting
operationsInProgress++;
concurrentUnitsInProgress++;
const currentIteratorResult: IteratorResult<TEntry> = await iterator.next();
// eslint-disable-next-line require-atomic-updates
iteratorIsComplete = !!currentIteratorResult.done;

if (!iteratorIsComplete) {
Promise.resolve(callback(currentIteratorResult.value, arrayIndex++))
const currentIteratorValue: TEntry = currentIteratorResult.value;
const weight: number = currentIteratorValue.weight ?? 1;
// If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1.
concurrentUnitsInProgress += weight - 1;
Promise.resolve(callback(currentIteratorValue.element, arrayIndex++))
.then(async () => {
operationsInProgress--;
concurrentUnitsInProgress -= weight;
await onOperationCompletionAsync();
})
.catch((error) => {
@@ -131,7 +183,7 @@ export class Async {
});
} else {
// The iterator is complete and there wasn't a value, so untrack the waiting state.
operationsInProgress--;
concurrentUnitsInProgress--;
}
}

@@ -142,7 +194,7 @@ export class Async {

async function onOperationCompletionAsync(): Promise<void> {
if (!promiseHasResolvedOrRejected) {
if (operationsInProgress === 0 && iteratorIsComplete) {
if (concurrentUnitsInProgress === 0 && iteratorIsComplete) {
promiseHasResolvedOrRejected = true;
resolve();
} else if (!iteratorIsComplete) {
@@ -158,6 +210,46 @@ export class Async {
});
}

/**
* Given an input array and a `callback` function, invoke the callback to start a
* promise for each element in the array.
*
* @remarks
* This API is similar to the system `Array#forEach`, except that the loop is asynchronous,
* and the maximum number of concurrent promises can be throttled
* using {@link IAsyncParallelismOptions.concurrency}.
*
* If `callback` throws a synchronous exception, or if it returns a promise that rejects,
* then the loop stops immediately. Any remaining array items will be skipped, and
* overall operation will reject with the first error that was encountered.
*
* @param iterable - the array of inputs for the callback function
* @param callback - a function that starts an asynchronous promise for an element
* from the array
* @param options - options for customizing the control flow
*/
public static async forEachAsync<TEntry>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions
iclanton marked this conversation as resolved.
Show resolved Hide resolved
): Promise<void>;
public static async forEachAsync<TEntry extends IWeightedIterable>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
options: IAsyncParallelismOptions & { weighted: true }
): Promise<void>;
public static async forEachAsync<TEntry>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions & { weighted?: true }
aramissennyeydd marked this conversation as resolved.
Show resolved Hide resolved
): Promise<void> {
if (options?.weighted) {
return Async._forEachWeightedAsync(toWeightedIterator(iterable), callback, options);
}

return Async._forEachWeightedAsync(toWeightedIterator(iterable), callback, options);
}

/**
* Return a promise that resolves after the specified number of milliseconds.
*/
8 changes: 7 additions & 1 deletion libraries/node-core-library/src/index.ts
Original file line number Diff line number Diff line change
@@ -8,7 +8,13 @@
*/

export { AlreadyReportedError } from './AlreadyReportedError';
export { Async, AsyncQueue, IAsyncParallelismOptions, IRunWithRetriesOptions } from './Async';
export {
Async,
AsyncQueue,
IAsyncParallelismOptions,
IRunWithRetriesOptions,
IWeightedIterable
} from './Async';
export { Brand } from './PrimitiveTypes';
export { FileConstants, FolderConstants } from './Constants';
export { Enum } from './Enum';
Loading
Loading