Skip to content

Commit

Permalink
[rush] Add "waiting" status, fix status updates
Browse files Browse the repository at this point in the history
  • Loading branch information
dmichon-msft committed Sep 14, 2023
1 parent 4890f4e commit f1f9894
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@microsoft/rush",
"comment": "Add \"Waiting\" operation status for operations that have one or more dependencies still pending. Ensure that the `onOperationStatusChanged` hook fires for every status change.",
"type": "none"
}
],
"packageName": "@microsoft/rush"
}
3 changes: 2 additions & 1 deletion common/reviews/api/rush-lib.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,8 @@ export enum OperationStatus {
RemoteExecuting = "REMOTE EXECUTING",
Skipped = "SKIPPED",
Success = "SUCCESS",
SuccessWithWarning = "SUCCESS WITH WARNINGS"
SuccessWithWarning = "SUCCESS WITH WARNINGS",
Waiting = "WAITING"
}

// @public (undocumented)
Expand Down
49 changes: 31 additions & 18 deletions libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ export class AsyncOperationQueue
*/
public complete(record: OperationExecutionRecord): void {
this._completedOperations.add(record);

// Apply status changes to direct dependents
for (const item of record.consumers) {
// Remove this operation from the dependencies, to unblock the scheduler
if (
item.dependencies.delete(record) &&
item.dependencies.size === 0 &&
item.status === OperationStatus.Waiting
) {
item.status = OperationStatus.Ready;
}
}

if (this._completedOperations.size === this._totalOperations) {
this._isDone = true;
}
Expand All @@ -88,39 +101,39 @@ export class AsyncOperationQueue

// By iterating in reverse order we do less array shuffling when removing operations
for (let i: number = queue.length - 1; waitingIterators.length > 0 && i >= 0; i--) {
const operation: OperationExecutionRecord = queue[i];
const record: OperationExecutionRecord = queue[i];

if (
operation.status === OperationStatus.Blocked ||
operation.status === OperationStatus.Skipped ||
operation.status === OperationStatus.Success ||
operation.status === OperationStatus.SuccessWithWarning ||
operation.status === OperationStatus.FromCache ||
operation.status === OperationStatus.NoOp ||
operation.status === OperationStatus.Failure
record.status === OperationStatus.Blocked ||
record.status === OperationStatus.Skipped ||
record.status === OperationStatus.Success ||
record.status === OperationStatus.SuccessWithWarning ||
record.status === OperationStatus.FromCache ||
record.status === OperationStatus.NoOp ||
record.status === OperationStatus.Failure
) {
// It shouldn't be on the queue, remove it
queue.splice(i, 1);
} else if (
operation.status === OperationStatus.Queued ||
operation.status === OperationStatus.Executing
) {
} else if (record.status === OperationStatus.Queued || record.status === OperationStatus.Executing) {
// This operation is currently executing
// next one plz :)
} else if (record.status === OperationStatus.Waiting) {
// This operation is not yet ready to be executed
// next one plz :)
continue;
} else if (operation.status === OperationStatus.RemoteExecuting) {
} else if (record.status === OperationStatus.RemoteExecuting) {
// This operation is not ready to execute yet, but it may become ready later
// next one plz :)
continue;
} else if (operation.status !== OperationStatus.Ready) {
} else if (record.status !== OperationStatus.Ready) {
// Sanity check
throw new Error(`Unexpected status "${operation.status}" for queued operation: ${operation.name}`);
} else if (operation.dependencies.size === 0) {
throw new Error(`Unexpected status "${record.status}" for queued operation: ${record.name}`);
} else {
// This task is ready to process, hand it to the iterator.
// Needs to have queue semantics, otherwise tools that iterate it get confused
operation.status = OperationStatus.Queued;
record.status = OperationStatus.Queued;
waitingIterators.shift()!({
value: operation,
value: record,
done: false
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const TIMELINE_WIDTH: number = 109;
* Timeline - symbols representing each operation status
*/
const TIMELINE_CHART_SYMBOLS: Record<OperationStatus, string> = {
[OperationStatus.Waiting]: '?',
[OperationStatus.Ready]: '?',
[OperationStatus.Queued]: '?',
[OperationStatus.Executing]: '?',
Expand All @@ -92,6 +93,7 @@ const TIMELINE_CHART_SYMBOLS: Record<OperationStatus, string> = {
* Timeline - colorizer for each operation status
*/
const TIMELINE_CHART_COLORIZER: Record<OperationStatus, (string: string) => string> = {
[OperationStatus.Waiting]: colors.yellow,
[OperationStatus.Ready]: colors.yellow,
[OperationStatus.Queued]: colors.yellow,
[OperationStatus.Executing]: colors.yellow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,6 @@ export class OperationExecutionManager {
if (record.status !== OperationStatus.RemoteExecuting) {
// If the operation was not remote, then we can notify queue that it is complete
this._executionQueue.complete(record);

// Apply status changes to direct dependents
for (const item of record.consumers) {
// Remove this operation from the dependencies, to unblock the scheduler
item.dependencies.delete(record);
}
}
}
}
31 changes: 20 additions & 11 deletions libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
*/
public readonly operation: Operation;

/**
* The current execution status of an operation. Operations start in the 'ready' state,
* but can be 'blocked' if an upstream operation failed. It is 'executing' when
* the operation is executing. Once execution is complete, it is either 'success' or
* 'failure'.
*/
public status: OperationStatus = OperationStatus.Ready;

/**
* The error which occurred while executing this operation, this is stored in case we need
* it later (for example to re-print errors at end of execution).
Expand Down Expand Up @@ -101,6 +93,7 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
private readonly _context: IOperationExecutionRecordContext;

private _collatedWriter: CollatedWriter | undefined = undefined;
private _status: OperationStatus;

public constructor(operation: Operation, context: IOperationExecutionRecordContext) {
const { runner, associatedPhase, associatedProject } = operation;
Expand All @@ -123,6 +116,7 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
});
}
this._context = context;
this._status = operation.dependencies.size > 0 ? OperationStatus.Waiting : OperationStatus.Ready;
}

public get name(): string {
Expand Down Expand Up @@ -159,6 +153,23 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
return this._operationMetadataManager?.stateFile.state?.cobuildRunnerId;
}

/**
* The current execution status of an operation. Operations start in the 'ready' state,
* but can be 'blocked' if an upstream operation failed. It is 'executing' when
* the operation is executing. Once execution is complete, it is either 'success' or
* 'failure'.
*/
public get status(): OperationStatus {
return this._status;
}
public set status(newStatus: OperationStatus) {
if (newStatus === this._status) {
return;
}
this._status = newStatus;
this._context.onOperationStatusChanged?.(this);
}

public async executeAsync({
onStart,
onResult
Expand All @@ -169,9 +180,8 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
if (this.status === OperationStatus.RemoteExecuting) {
this.stopwatch.reset();
}
this.status = OperationStatus.Executing;
this.stopwatch.start();
this._context.onOperationStatusChanged?.(this);
this.status = OperationStatus.Executing;

try {
const earlyReturnStatus: OperationStatus | undefined = await onStart(this);
Expand All @@ -194,7 +204,6 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
this.stdioSummarizer.close();
this.stopwatch.stop();
}
this._context.onOperationStatusChanged?.(this);
}
}
}
6 changes: 5 additions & 1 deletion libraries/rush-lib/src/logic/operations/OperationStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@
*/
export enum OperationStatus {
/**
* The Operation is on the queue, ready to execute (but may be waiting for dependencies)
* The Operation is ready to execute. All its dependencies have succeeded.
*/
Ready = 'READY',
/**
* The Operation is waiting for one or more dependencies to complete.
*/
Waiting = 'WAITING',
/**
* The Operation is Queued
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { Async } from '@rushstack/node-core-library';
function addDependency(consumer: OperationExecutionRecord, dependency: OperationExecutionRecord): void {
consumer.dependencies.add(dependency);
dependency.consumers.add(consumer);
consumer.status = OperationStatus.Waiting;
}

function nullSort(a: OperationExecutionRecord, b: OperationExecutionRecord): number {
Expand Down Expand Up @@ -50,9 +51,6 @@ describe(AsyncOperationQueue.name, () => {
hasUnassignedOperation = true;
continue;
}
for (const consumer of operation.consumers) {
consumer.dependencies.delete(operation);
}
operation.status = OperationStatus.Success;
queue.complete(operation);
}
Expand Down Expand Up @@ -82,9 +80,6 @@ describe(AsyncOperationQueue.name, () => {
hasUnassignedOperation = true;
continue;
}
for (const consumer of operation.consumers) {
consumer.dependencies.delete(operation);
}
operation.status = OperationStatus.Success;
queue.complete(operation);
}
Expand Down Expand Up @@ -151,10 +146,6 @@ describe(AsyncOperationQueue.name, () => {

await Promise.resolve();

for (const consumer of operation.consumers) {
consumer.dependencies.delete(operation);
}

--concurrency;
operation.status = OperationStatus.Success;
queue.complete(operation);
Expand Down Expand Up @@ -213,9 +204,6 @@ describe(AsyncOperationQueue.name, () => {
continue;
}
}
for (const consumer of record.consumers) {
consumer.dependencies.delete(record);
}
record.status = OperationStatus.Success;
queue.complete(record);
}
Expand Down

0 comments on commit f1f9894

Please sign in to comment.