Skip to content

Commit

Permalink
Merge pull request #4672 from aramissennyeydd/sennyeya/weighted-graph
Browse files Browse the repository at this point in the history
feat(rush,node-core-library): allow weighted async concurrency
  • Loading branch information
iclanton authored May 5, 2024
2 parents 0c41a82 + a526067 commit 21afaae
Show file tree
Hide file tree
Showing 15 changed files with 484 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@microsoft/rush",
"comment": "Add a `\"weight\"` property to the `\"operation\"` object in the project `config/rush-project.json` file that defines an integer weight for how much of the allowed parallelism the operation uses.",
"type": "none"
}
],
"packageName": "@microsoft/rush"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@rushstack/node-core-library",
"comment": "Add a new `weighted: true` option to the `Async.forEachAsync` method that allows each element to specify how much of the allowed parallelism the callback uses.",
"type": "minor"
}
],
"packageName": "@rushstack/node-core-library"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@rushstack/node-core-library",
"comment": "Add a new `weighted: true` option to the `Async.mapAsync` method that allows each element to specify how much of the allowed parallelism the callback uses.",
"type": "patch"
}
],
"packageName": "@rushstack/node-core-library"
}
21 changes: 19 additions & 2 deletions common/reviews/api/node-core-library.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,22 @@ export class AlreadyReportedError extends Error {

// @public
export class Async {
static forEachAsync<TEntry>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options?: IAsyncParallelismOptions | undefined): Promise<void>;
static forEachAsync<TEntry>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options?: (IAsyncParallelismOptions & {
weighted?: false;
}) | undefined): Promise<void>;
static forEachAsync<TEntry extends IWeighted>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options: IAsyncParallelismOptions & {
weighted: true;
}): Promise<void>;
static getSignal(): [Promise<void>, () => void, (err: Error) => void];
static mapAsync<TEntry, TRetVal>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>, options?: IAsyncParallelismOptions | undefined): Promise<TRetVal[]>;
static mapAsync<TEntry, TRetVal>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>, options?: (IAsyncParallelismOptions & {
weighted?: false;
}) | undefined): Promise<TRetVal[]>;
static mapAsync<TEntry extends IWeighted, TRetVal>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>, options: IAsyncParallelismOptions & {
weighted: true;
}): Promise<TRetVal[]>;
static runWithRetriesAsync<TResult>({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions<TResult>): Promise<TResult>;
static sleep(ms: number): Promise<void>;
static validateWeightedIterable(operation: IWeighted): void;
}

// @public
Expand Down Expand Up @@ -225,6 +236,7 @@ export type FolderItem = fs.Dirent;
// @public
export interface IAsyncParallelismOptions {
concurrency?: number;
weighted?: boolean;
}

// @public
Expand Down Expand Up @@ -602,6 +614,11 @@ export interface IWaitForExitWithStringOptions extends IWaitForExitOptions {
encoding: BufferEncoding;
}

// @public (undocumented)
export interface IWeighted {
weight: number;
}

// @public
export class JsonFile {
// @internal (undocumented)
Expand Down
1 change: 1 addition & 0 deletions common/reviews/api/rush-lib.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ export interface IOperationSettings {
disableBuildCacheForOperation?: boolean;
operationName: string;
outputFolderNames?: string[];
weight?: number;
}

// @internal (undocumented)
Expand Down
205 changes: 175 additions & 30 deletions libraries/node-core-library/src/Async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@
* Options for controlling the parallelism of asynchronous operations.
*
* @remarks
* Used with {@link Async.mapAsync} and {@link Async.forEachAsync}.
* Used with {@link (Async:class).(mapAsync:1)}, {@link (Async:class).(mapAsync:2)} and
* {@link (Async:class).(forEachAsync:1)}, and {@link (Async:class).(forEachAsync:2)}.
*
* @public
*/
export interface IAsyncParallelismOptions {
/**
* Optionally used with the {@link Async.mapAsync} and {@link Async.forEachAsync}
* to limit the maximum number of concurrent promises to the specified number.
* Optionally used with the {@link (Async:class).(mapAsync:1)}, {@link (Async:class).(mapAsync:2)} and
* {@link (Async:class).(forEachAsync:1)}, and {@link (Async:class).(forEachAsync:2)} to limit the maximum
* number of concurrent promises to the specified number.
*/
concurrency?: number;

/**
* Optionally used with the {@link (Async:class).(forEachAsync:2)} to enable weighted operations where an operation can
* take up more or less than one concurrency unit.
*/
weighted?: boolean;
}

/**
Expand All @@ -29,6 +37,42 @@ export interface IRunWithRetriesOptions<TResult> {
retryDelayMs?: number;
}

/**
* @remarks
* Used with {@link (Async:class).(forEachAsync:2)} and {@link (Async:class).(mapAsync:2)}.
*
* @public
*/
export interface IWeighted {
/**
* The weight of the element, used to determine the concurrency units that it will take up.
* Must be a whole number greater than or equal to 0.
*/
weight: number;
}

function toWeightedIterator<TEntry>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
useWeights?: boolean
): AsyncIterable<{ element: TEntry; weight: number }> {
const iterator: Iterator<TEntry> | AsyncIterator<TEntry, TEntry> = (
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
(iterable as AsyncIterable<TEntry>)[Symbol.asyncIterator]
).call(iterable);
return {
[Symbol.asyncIterator]: () => ({
next: async () => {
// The await is necessary here, but TS will complain - it's a false positive.
const { value, done } = await iterator.next();
return {
value: { element: value, weight: useWeights ? value?.weight : 1 },
done: !!done
};
}
})
};
}

/**
* Utilities for parallel asynchronous operations, for use with the system `Promise` APIs.
*
Expand Down Expand Up @@ -58,29 +102,18 @@ export class Async {
public static async mapAsync<TEntry, TRetVal>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>,
options?: IAsyncParallelismOptions | undefined
): Promise<TRetVal[]> {
const result: TRetVal[] = [];

await Async.forEachAsync(
iterable,
async (item: TEntry, arrayIndex: number): Promise<void> => {
result[arrayIndex] = await callback(item, arrayIndex);
},
options
);

return result;
}
options?: (IAsyncParallelismOptions & { weighted?: false }) | undefined
): Promise<TRetVal[]>;

/**
* Given an input array and a `callback` function, invoke the callback to start a
* promise for each element in the array.
* promise for each element in the array. Returns an array containing the results.
*
* @remarks
* This API is similar to the system `Array#forEach`, except that the loop is asynchronous,
* and the maximum number of concurrent promises can be throttled
* using {@link IAsyncParallelismOptions.concurrency}.
* This API is similar to the system `Array#map`, except that the loop is asynchronous,
* and the maximum number of concurrent units can be throttled
* using {@link IAsyncParallelismOptions.concurrency}. Using the {@link IAsyncParallelismOptions.weighted}
* option, the weight of each operation can be specified, which determines how many concurrent units it takes up.
*
* If `callback` throws a synchronous exception, or if it returns a promise that rejects,
* then the loop stops immediately. Any remaining array items will be skipped, and
Expand All @@ -90,16 +123,42 @@ export class Async {
* @param callback - a function that starts an asynchronous promise for an element
* from the array
* @param options - options for customizing the control flow
* @returns an array containing the result for each callback, in the same order
* as the original input `array`
*/
public static async forEachAsync<TEntry>(
public static async mapAsync<TEntry extends IWeighted, TRetVal>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>,
options: IAsyncParallelismOptions & { weighted: true }
): Promise<TRetVal[]>;
public static async mapAsync<TEntry, TRetVal>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>,
options?: IAsyncParallelismOptions | undefined
): Promise<TRetVal[]> {
const result: TRetVal[] = [];

// @ts-expect-error https://github.com/microsoft/TypeScript/issues/22609, it succeeds against the implementation but fails against the overloads
await Async.forEachAsync(
iterable,
async (item: TEntry, arrayIndex: number): Promise<void> => {
result[arrayIndex] = await callback(item, arrayIndex);
},
options
);

return result;
}

private static async _forEachWeightedAsync<TReturn, TEntry extends { weight: number; element: TReturn }>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TReturn, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions | undefined
): Promise<void> {
await new Promise<void>((resolve: () => void, reject: (error: Error) => void) => {
const concurrency: number =
options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity;
let operationsInProgress: number = 0;
let concurrentUnitsInProgress: number = 0;

const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
Expand All @@ -111,18 +170,29 @@ export class Async {
let promiseHasResolvedOrRejected: boolean = false;

async function queueOperationsAsync(): Promise<void> {
while (operationsInProgress < concurrency && !iteratorIsComplete && !promiseHasResolvedOrRejected) {
while (
concurrentUnitsInProgress < concurrency &&
!iteratorIsComplete &&
!promiseHasResolvedOrRejected
) {
// Increment the concurrency while waiting for the iterator.
// This function is reentrant, so this ensures that at most `concurrency` executions are waiting
operationsInProgress++;
concurrentUnitsInProgress++;
const currentIteratorResult: IteratorResult<TEntry> = await iterator.next();
// eslint-disable-next-line require-atomic-updates
iteratorIsComplete = !!currentIteratorResult.done;

if (!iteratorIsComplete) {
Promise.resolve(callback(currentIteratorResult.value, arrayIndex++))
const currentIteratorValue: TEntry = currentIteratorResult.value;
Async.validateWeightedIterable(currentIteratorValue);
const weight: number = Math.min(currentIteratorValue.weight, concurrency);
// If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1.
// Cap it to the concurrency limit, otherwise higher weights can cause issues in the case where 0 weighted
// operations are present.
concurrentUnitsInProgress += weight - 1;
Promise.resolve(callback(currentIteratorValue.element, arrayIndex++))
.then(async () => {
operationsInProgress--;
concurrentUnitsInProgress -= weight;
await onOperationCompletionAsync();
})
.catch((error) => {
Expand All @@ -131,7 +201,7 @@ export class Async {
});
} else {
// The iterator is complete and there wasn't a value, so untrack the waiting state.
operationsInProgress--;
concurrentUnitsInProgress--;
}
}

Expand All @@ -142,7 +212,7 @@ export class Async {

async function onOperationCompletionAsync(): Promise<void> {
if (!promiseHasResolvedOrRejected) {
if (operationsInProgress === 0 && iteratorIsComplete) {
if (concurrentUnitsInProgress === 0 && iteratorIsComplete) {
promiseHasResolvedOrRejected = true;
resolve();
} else if (!iteratorIsComplete) {
Expand All @@ -158,6 +228,68 @@ export class Async {
});
}

/**
* Given an input array and a `callback` function, invoke the callback to start a
* promise for each element in the array.
*
* @remarks
* This API is similar to the system `Array#forEach`, except that the loop is asynchronous,
* and the maximum number of concurrent promises can be throttled
* using {@link IAsyncParallelismOptions.concurrency}.
*
* If `callback` throws a synchronous exception, or if it returns a promise that rejects,
* then the loop stops immediately. Any remaining array items will be skipped, and
* overall operation will reject with the first error that was encountered.
*
* @param iterable - the array of inputs for the callback function
* @param callback - a function that starts an asynchronous promise for an element
* from the array
* @param options - options for customizing the control flow
*/
public static async forEachAsync<TEntry>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
options?: (IAsyncParallelismOptions & { weighted?: false }) | undefined
): Promise<void>;

/**
* Given an input array and a `callback` function, invoke the callback to start a
* promise for each element in the array.
*
* @remarks
* This API is similar to the other `Array#forEachAsync`, except that each item can have
* a weight that determines how many concurrent operations are allowed. The unweighted
* `Array#forEachAsync` is a special case of this method where weight = 1 for all items.
*
* The maximum number of concurrent operations can still be throttled using
* {@link IAsyncParallelismOptions.concurrency}, however it no longer determines the
* maximum number of operations that can be in progress at once. Instead, it determines the
* number of concurrency units that can be in progress at once. The weight of each operation
* determines how many concurrency units it takes up. For example, if the concurrency is 2
* and the first operation has a weight of 2, then only one more operation can be in progress.
*
* If `callback` throws a synchronous exception, or if it returns a promise that rejects,
* then the loop stops immediately. Any remaining array items will be skipped, and
* overall operation will reject with the first error that was encountered.
*
* @param iterable - the array of inputs for the callback function
* @param callback - a function that starts an asynchronous promise for an element
* from the array
* @param options - options for customizing the control flow
*/
public static async forEachAsync<TEntry extends IWeighted>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
options: IAsyncParallelismOptions & { weighted: true }
): Promise<void>;
public static async forEachAsync<TEntry>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions
): Promise<void> {
await Async._forEachWeightedAsync(toWeightedIterator(iterable, options?.weighted), callback, options);
}

/**
* Return a promise that resolves after the specified number of milliseconds.
*/
Expand Down Expand Up @@ -190,6 +322,19 @@ export class Async {
}
}

/**
* Ensures that the argument is a valid {@link IWeighted}, with a `weight` argument that
* is a positive integer or 0.
*/
public static validateWeightedIterable(operation: IWeighted): void {
if (operation.weight < 0) {
throw new Error('Weight must be a whole number greater than or equal to 0');
}
if (operation.weight % 1 !== 0) {
throw new Error('Weight must be a whole number greater than or equal to 0');
}
}

/**
* Returns a Signal, a.k.a. a "deferred promise".
*/
Expand Down
2 changes: 1 addition & 1 deletion libraries/node-core-library/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

export { AlreadyReportedError } from './AlreadyReportedError';
export { Async, AsyncQueue, IAsyncParallelismOptions, IRunWithRetriesOptions } from './Async';
export { Async, AsyncQueue, IAsyncParallelismOptions, IRunWithRetriesOptions, IWeighted } from './Async';
export { Brand } from './PrimitiveTypes';
export { FileConstants, FolderConstants } from './Constants';
export { Enum } from './Enum';
Expand Down
Loading

0 comments on commit 21afaae

Please sign in to comment.