Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[node-core-library][rush] Support weighted async concurrency #4092

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@microsoft/rush",
"comment": "Allow operations to have variable weight for concurrency purposes.",
"type": "none"
}
],
"packageName": "@microsoft/rush"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@rushstack/node-core-library",
"comment": "Support variable concurrency based on task weights in `Async.forEachAsync`.",
"type": "minor"
}
],
"packageName": "@rushstack/node-core-library"
}
9 changes: 8 additions & 1 deletion common/reviews/api/node-core-library.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ export class AnsiEscape {

// @beta
export class Async {
static forEachAsync<TEntry>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options?: IAsyncParallelismOptions | undefined): Promise<void>;
static forEachAsync<TEntry>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<void>, options?: TEntry extends {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{ weight?: number } must be declared as an interface, so that we have some place to document the weight property.

weight?: number;
Copy link
Collaborator

@octogonz octogonz May 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type algebra seems overcomplicated. I think is trying to say that IF the options specifies useWeight, THEN the entry must define a weight. I don't think it actually accomplishes that, but it is a slightly odd requirement given that both these properties are optional. If weight defaults to 1, then is it really important to have been able to specify it, even though we chose not to?

I think it might be better to simply say that "If useWeight=true, then forEachSync will use the weight property if present, otherwise the weight defaults to 1." The only realistic concern here is if old code randomly had a weight property, that suddenly gains new meaning in this release, but you've already solved that by defaulting useWeight to false.

This would also eliminate the need for a separate IAsyncParallelismOptionsWithWeight interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check on the optional property is to ensure that the TEntry type does not have a property weight that is incompatible with type number. I originally considered writing this as an overload, but I'm generally leery of overloads in case something uses typeof Async.forEachAsync somewhere. Might be cleanest to just add a completely separate entry point.

} ? IAsyncParallelismOptionsWithWeight : IAsyncParallelismOptions | undefined): Promise<void>;
static mapAsync<TEntry, TRetVal>(iterable: Iterable<TEntry> | AsyncIterable<TEntry>, callback: (entry: TEntry, arrayIndex: number) => Promise<TRetVal>, options?: IAsyncParallelismOptions | undefined): Promise<TRetVal[]>;
static runWithRetriesAsync<TResult>({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions<TResult>): Promise<TResult>;
static sleep(ms: number): Promise<void>;
Expand Down Expand Up @@ -319,6 +321,11 @@ export interface IAsyncParallelismOptions {
concurrency?: number;
}

// @beta
export interface IAsyncParallelismOptionsWithWeight extends IAsyncParallelismOptions {
useWeight?: boolean;
}

// @beta (undocumented)
export interface IColorableSequence {
// (undocumented)
Expand Down
103 changes: 52 additions & 51 deletions libraries/node-core-library/src/Async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ export interface IAsyncParallelismOptions {
concurrency?: number;
}

/**
* Options for controlling the parallelism of asynchronous operations that have a property `weight?: number`.
*
* @remarks
* Used with {@link Async.forEachAsync}.
*
* @beta
*/
export interface IAsyncParallelismOptionsWithWeight extends IAsyncParallelismOptions {
/**
* If set, then will use `item.weight` instead of `1` as the parallelism consumed by a given iteration.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs more explanation. I had to read the code carefully to figure out what weight actually represents. My guess was that somehow affected the priority of the task (like Unix nice), but that was wrong. Also, the PR description and code comments never mention why someone would want this feature, i.e. what problem it solves.

Copy link
Collaborator

@octogonz octogonz May 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try something like this:

  /**
   * Optionally used with the  {@link Async.mapAsync} and {@link Async.forEachAsync}
   * to limit amount of concurrency.
   * @remarks
   * By default, `concurrency` simply specifies the maximum number of promises that can be active
   * simultaneously.  However if `useWeight` is true, then behavior 
   * is different. See the {@link IAsyncParallelismOptions.useWeight} documentation for details.
   */
  concurrency?: number;

  /**
   * Set `useWeight` to true to configure the {@link IAsyncParallelismOptions.concurrency} 
   * setting to limit the total resource cost instead of limiting the total number of promises.
   * @remarks
   * By default, the `concurrency` specifies the maximum number of promises that can be
   * active simultaneously.  If promises have different resource costs, and those costs are known
   * in advance, then `useWeight` changes `concurrency` so that it limits the sum of the weights
   * of the entries passed to {@link Async.mapAsync} or {@link Async.forEachAsync}.
   * 
   * The {@link IAsyncEntry.weight} property is optional and defaults to 1.  (If all entries
   * use the default of 1, then the behavior is identical to `useWeight=false`.)
   * Weights can be fractional but not negative.
   */
  useWeight?: boolean;
  /**
   * If {@link Async.mapAsync} or {@link Async.forEachAsync} are used to parallelize tasks
   * that have different resource costs, and this property can be used to specify the resource
   * cost so that it can be limited by {@link IAsyncParallelismOptions.concurrency}.
   * @remarks
   * This property is ignored unless `useWeight` is true.  See the {@link 
   * IAsyncParallelismOptions.useWeight} documentation for details.
   */
  weight?: number

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is weight the best word? Maybe concurrencyResourceCost (or at least concurrencyWeight)? It would be great of the naming gave some clue that concurrency and weight are part of the same feature.

Also we are asking to assign weight directly to somebody else's class/interface, so the more unique this name is, the less likely it is to accidentally conflict with an unrelated member.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name weight was chosen purely to match its existing presence on Rush's Operation class, but that could be changed to whatever name seems to make the most sense.

I'm also open to having an altogether separate entry point for tasks whose CPU resource needs are not uniformly 1.

*/
useWeight?: boolean;
}

/**
* @remarks
* Used with {@link Async.runWithRetriesAsync}.
Expand Down Expand Up @@ -94,68 +109,54 @@ export class Async {
public static async forEachAsync<TEntry>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
callback: (entry: TEntry, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions | undefined
options?: TEntry extends { weight?: number }
? IAsyncParallelismOptionsWithWeight
: IAsyncParallelismOptions | undefined
): Promise<void> {
await new Promise<void>((resolve: () => void, reject: (error: Error) => void) => {
const concurrency: number =
options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity;
let operationsInProgress: number = 0;
const concurrency: number =
options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity;

const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
(iterable as AsyncIterable<TEntry>)[Symbol.asyncIterator]
).call(iterable);
const useWeight: boolean | undefined = (options as IAsyncParallelismOptionsWithWeight)?.useWeight;

let arrayIndex: number = 0;
let iteratorIsComplete: boolean = false;
let promiseHasResolvedOrRejected: boolean = false;
const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
(iterable as AsyncIterable<TEntry>)[Symbol.asyncIterator]
).call(iterable);

async function queueOperationsAsync(): Promise<void> {
while (operationsInProgress < concurrency && !iteratorIsComplete && !promiseHasResolvedOrRejected) {
// Increment the concurrency while waiting for the iterator.
// This function is reentrant, so this ensures that at most `concurrency` executions are waiting
operationsInProgress++;
const currentIteratorResult: IteratorResult<TEntry> = await iterator.next();
// eslint-disable-next-line require-atomic-updates
iteratorIsComplete = !!currentIteratorResult.done;
let arrayIndex: number = 0;
let usedCapacity: number = 0;

if (!iteratorIsComplete) {
Promise.resolve(callback(currentIteratorResult.value, arrayIndex++))
.then(async () => {
operationsInProgress--;
await onOperationCompletionAsync();
})
.catch((error) => {
promiseHasResolvedOrRejected = true;
reject(error);
});
} else {
// The iterator is complete and there wasn't a value, so untrack the waiting state.
operationsInProgress--;
}
}
const pending: Set<Promise<void>> = new Set();

if (iteratorIsComplete) {
await onOperationCompletionAsync();
}
// eslint-disable-next-line no-constant-condition
while (true) {
const currentIteratorResult: IteratorResult<TEntry> = await iterator.next();
if (currentIteratorResult.done) {
break;
}

async function onOperationCompletionAsync(): Promise<void> {
if (!promiseHasResolvedOrRejected) {
if (operationsInProgress === 0 && iteratorIsComplete) {
promiseHasResolvedOrRejected = true;
resolve();
} else if (!iteratorIsComplete) {
await queueOperationsAsync();
}
}
const { value } = currentIteratorResult;
const weight: number = useWeight ? (value as { weight?: number })?.weight ?? 1 : 1;
if (weight < 0) {
throw new Error(`Invalid weight ${weight}. Weights must be greater than or equal to 0.`);
}

queueOperationsAsync().catch((error) => {
promiseHasResolvedOrRejected = true;
reject(error);
usedCapacity += weight;
const promise: Promise<void> = Promise.resolve(callback(value, arrayIndex++)).then(() => {
usedCapacity -= weight;
pending.delete(promise);
});
});
pending.add(promise);

// eslint-disable-next-line no-unmodified-loop-condition
while (usedCapacity >= concurrency && pending.size > 0) {
await Promise.race(Array.from(pending));
}
}

if (pending.size > 0) {
await Promise.all(Array.from(pending));
}
}

/**
Expand Down
8 changes: 7 additions & 1 deletion libraries/node-core-library/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@

export { AlreadyReportedError } from './AlreadyReportedError';
export { AnsiEscape, IAnsiEscapeConvertForTestsOptions } from './Terminal/AnsiEscape';
export { Async, AsyncQueue, IAsyncParallelismOptions, IRunWithRetriesOptions } from './Async';
export {
Async,
AsyncQueue,
IAsyncParallelismOptions,
IAsyncParallelismOptionsWithWeight,
IRunWithRetriesOptions
} from './Async';
export { Brand } from './PrimitiveTypes';
export { FileConstants, FolderConstants } from './Constants';
export { Enum } from './Enum';
Expand Down
53 changes: 51 additions & 2 deletions libraries/node-core-library/src/test/Async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ describe(Async.name, () => {
).rejects.toThrow(expectedError);
});

it('does not exceed the maxiumum concurrency for an async iterator', async () => {
it('only has at most 1 waiting iterator, regardless of concurrency', async () => {
let waitingIterators: number = 0;

let resolve2!: (value: { done: true; value: undefined }) => void;
Expand Down Expand Up @@ -286,11 +286,60 @@ describe(Async.name, () => {

// Wait for all the instant resolutions to be done
await Async.sleep(1);
expect(waitingIterators).toEqual(expectedConcurrency);
expect(waitingIterators).toEqual(1);
resolve2({ done: true, value: undefined });
await finalPromise;
});

it('if concurrency and useWeight are set, ensures no more than N/weight operations occur in parallel', async () => {
interface INumberWithWeight {
n: number;
weight: number;
}

interface ITestCase {
concurrency: number;
weight: number;
expectedConcurrency: number;
}

const cases: ITestCase[] = [
{
concurrency: 2,
weight: 0.5,
expectedConcurrency: 4
},
{
concurrency: 1,
weight: 0.4,
expectedConcurrency: 3
},
{
concurrency: 1,
weight: 0.17,
expectedConcurrency: 6
}
];

for (const { concurrency, weight, expectedConcurrency } of cases) {
let running: number = 0;
let maxRunning: number = 0;

const array: INumberWithWeight[] = [1, 2, 3, 4, 5, 6, 7, 8].map((n) => ({ n, weight }));

const fn: (item: INumberWithWeight) => Promise<void> = jest.fn(async (item) => {
running++;
await Async.sleep(1);
maxRunning = Math.max(maxRunning, running);
running--;
});

await Async.forEachAsync(array, fn, { concurrency, useWeight: true });
expect(fn).toHaveBeenCalledTimes(8);
expect(maxRunning).toEqual(expectedConcurrency);
}
});

it('rejects if an async iterator rejects', async () => {
const expectedError: Error = new Error('iterator error');
let iteratorIndex: number = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,11 @@ export class OperationExecutionManager {
executionQueue,
async (operation: OperationExecutionRecord) => {
await operation.executeAsync(onOperationComplete);
executionQueue.assignOperations();
},
{
concurrency: maxParallelism
concurrency: maxParallelism,
useWeight: true
}
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ function createOperations(
result: OperationStatus.Skipped,
silent: true
});
operation.weight = 0.01;
} else if (changedProjects.has(project)) {
operationsWithWork.add(operation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ function createShellOperations(
result: OperationStatus.NoOp,
silent: false
});
operation.weight = 0.01;
}
}
}
Expand Down