Skip to content

Commit

Permalink
fix(core): Use AbortController to Notify nodes to abort execution
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Apr 28, 2023
1 parent a72a511 commit 5a0b49d
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 65 deletions.
95 changes: 50 additions & 45 deletions packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import * as ResponseHelper from '@/ResponseHelper';
import { isWorkflowIdValid } from '@/utils';
import { Service } from 'typedi';

type ExecutionStopCause = 'timeout' | 'stopExecution';

@Service()
export class ActiveExecutions {
private activeExecutions: {
[index: string]: IExecutingWorkflowData;
} = {};
private activeExecutions: Record<string, IExecutingWorkflowData> = {};

/**
* Add a new active execution
Expand Down Expand Up @@ -100,53 +100,60 @@ export class ActiveExecutions {

/**
* Attaches an execution
*
*/
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
attachWorkflowExecution(executionId: string, workflowExecution: PCancelable<IRun>) {
if (this.activeExecutions[executionId] === undefined) {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
throw new Error(
`No active execution with id "${executionId}" got found to attach to workflowExecution to!`,
);
}

this.activeExecutions[executionId].workflowExecution = workflowExecution;
execution.workflowExecution = workflowExecution;
}

attachResponsePromise(
executionId: string,
responsePromise: IDeferredPromise<IExecuteResponsePromiseData>,
): void {
if (this.activeExecutions[executionId] === undefined) {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
throw new Error(
`No active execution with id "${executionId}" got found to attach to workflowExecution to!`,
);
}

this.activeExecutions[executionId].responsePromise = responsePromise;
execution.responsePromise = responsePromise;
}

resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void {
if (this.activeExecutions[executionId] === undefined) {
return;
attachAbortController(executionId: string, abortController: AbortController): void {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
throw new Error(
`No active execution with id "${executionId}" got found to attach to workflowExecution to!`,
);
}

// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
this.activeExecutions[executionId].responsePromise?.resolve(response);
execution.abortController = abortController;
}

resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void {
const execution = this.activeExecutions[executionId];
if (execution === undefined) return;
execution.responsePromise?.resolve(response);
}

/**
* Remove an active execution
*
*/
remove(executionId: string, fullRunData?: IRun): void {
if (this.activeExecutions[executionId] === undefined) {
return;
}
const execution = this.activeExecutions[executionId];
if (execution === undefined) return;

// Resolve all the waiting promises
// eslint-disable-next-line no-restricted-syntax
for (const promise of this.activeExecutions[executionId].postExecutePromises) {
for (const promise of execution.postExecutePromises) {
promise.resolve(fullRunData);
}

Expand All @@ -156,32 +163,31 @@ export class ActiveExecutions {

/**
* Forces an execution to stop
*
* @param {string} executionId The id of the execution to stop
* @param {string} timeout String 'timeout' given if stop due to timeout
*/
async stopExecution(executionId: string, timeout?: string): Promise<IRun | undefined> {
if (this.activeExecutions[executionId] === undefined) {
// There is no execution running with that id
return;
}
async stopExecution(
executionId: string,
type: ExecutionStopCause = 'stopExecution',
): Promise<IRun | undefined> {
const execution = this.activeExecutions[executionId];

// There is no execution running with that id
if (execution === undefined) return;

const { process } = execution;

// In case something goes wrong make sure that promise gets first
// returned that it gets then also resolved correctly.
if (this.activeExecutions[executionId].process !== undefined) {
if (process !== undefined) {
// Workflow is running in subprocess
if (this.activeExecutions[executionId].process!.connected) {
setTimeout(() => {
// execute on next event loop tick;
this.activeExecutions[executionId].process!.send({
// eslint-disable-next-line @typescript-eslint/prefer-nullish-coalescing
type: timeout || 'stopExecution',
});
}, 1);
if (process.connected) {
// execute on next event loop tick
setImmediate(() => process.send({ type }));
}
} else {
// Notify nodes to abort all operations
execution.abortController?.abort();
// Workflow is running in current process
this.activeExecutions[executionId].workflowExecution!.cancel();
execution.workflowExecution!.cancel();
}

// eslint-disable-next-line consistent-return
Expand All @@ -195,14 +201,15 @@ export class ActiveExecutions {
* @param {string} executionId The id of the execution to wait for
*/
async getPostExecutePromise(executionId: string): Promise<IRun | undefined> {
const execution = this.activeExecutions[executionId];
// Create the promise which will be resolved when the execution finished
const waitPromise = await createDeferredPromise<IRun | undefined>();

if (this.activeExecutions[executionId] === undefined) {
if (execution === undefined) {
throw new Error(`There is no active execution with id "${executionId}".`);
}

this.activeExecutions[executionId].postExecutePromises.push(waitPromise);
execution.postExecutePromises.push(waitPromise);

// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-member-access
return waitPromise.promise();
Expand Down Expand Up @@ -233,21 +240,19 @@ export class ActiveExecutions {
}

async setStatus(executionId: string, status: ExecutionStatus): Promise<void> {
if (this.activeExecutions[executionId] === undefined) {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
LoggerProxy.debug(
`There is no active execution with id "${executionId}", can't update status to ${status}.`,
);
return;
}

this.activeExecutions[executionId].status = status;
execution.status = status;
}

getStatus(executionId: string): ExecutionStatus {
if (this.activeExecutions[executionId] === undefined) {
return 'unknown';
}

return this.activeExecutions[executionId].status;
const execution = this.activeExecutions[executionId];
return execution?.status ?? 'unknown';
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ export interface IExecutingWorkflowData {
startedAt: Date;
postExecutePromises: Array<IDeferredPromise<IRun | undefined>>;
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>;
/** @deprecated Use abortController with Promise instead */
workflowExecution?: PCancelable<IRun>;
abortController?: AbortController;
status: ExecutionStatus;
}

Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ async function executeWorkflow(

const runExecutionData = runData.executionData as IRunExecutionData;

// TODO: setup abortController
// Execute the workflow
const workflowExecute = new WorkflowExecute(
additionalDataIntegrated,
Expand Down
18 changes: 16 additions & 2 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ export class WorkflowRunner {
sessionId: data.sessionId,
});

const abortController = new AbortController();
if (data.executionData !== undefined) {
Logger.debug(`Execution ID ${executionId} had Execution data. Running with payload.`, {
executionId,
Expand All @@ -337,6 +338,7 @@ export class WorkflowRunner {
additionalData,
data.executionMode,
data.executionData,
abortController,
);
workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else if (
Expand All @@ -357,7 +359,12 @@ export class WorkflowRunner {
}

// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
const workflowExecute = new WorkflowExecute(
additionalData,
data.executionMode,
undefined,
abortController,
);
workflowExecution = workflowExecute.run(
workflow,
startNode,
Expand All @@ -367,7 +374,12 @@ export class WorkflowRunner {
} else {
Logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId });
// Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
const workflowExecute = new WorkflowExecute(
additionalData,
data.executionMode,
undefined,
abortController,
);
workflowExecution = workflowExecute.runPartialWorkflow(
workflow,
data.runData,
Expand All @@ -377,6 +389,7 @@ export class WorkflowRunner {
);
}

this.activeExecutions.attachAbortController(executionId, abortController);
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);

if (workflowTimeout > 0) {
Expand Down Expand Up @@ -620,6 +633,7 @@ export class WorkflowRunner {
// So we're just preventing crashes here.
});

// TODO: setup AbortController
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
return executionId;
}
Expand Down
17 changes: 14 additions & 3 deletions packages/cli/src/WorkflowRunnerProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,13 @@ class WorkflowRunnerProcess {

return returnData!.data!.main;
};

const abortController = new AbortController();
if (this.data.executionData !== undefined) {
this.workflowExecute = new WorkflowExecute(
additionalData,
this.data.executionMode,
this.data.executionData,
abortController,
);
return this.workflowExecute.processRunExecutionData(this.workflow);
}
Expand All @@ -305,7 +306,12 @@ class WorkflowRunnerProcess {
}

// Can execute without webhook so go on
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode);
this.workflowExecute = new WorkflowExecute(
additionalData,
this.data.executionMode,
undefined,
abortController,
);
return this.workflowExecute.run(
this.workflow,
startNode,
Expand All @@ -314,7 +320,12 @@ class WorkflowRunnerProcess {
);
}
// Execute only the nodes between start and destination nodes
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode);
this.workflowExecute = new WorkflowExecute(
additionalData,
this.data.executionMode,
undefined,
abortController,
);
return this.workflowExecute.runPartialWorkflow(
this.workflow,
this.data.runData,
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ export class Worker extends BaseCommand {
LoggerProxy.debug(`Queued worker execution status for ${executionId} is "${status}"`);
};

// TODO: setup AbortController
let workflowExecute: WorkflowExecute;
let workflowRun: PCancelable<IRun>;
if (currentExecutionDb.data !== undefined) {
Expand Down
24 changes: 17 additions & 7 deletions packages/core/src/NodeExecuteFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ import type {
RequestHelperFunctions,
FunctionsBase,
IExecuteFunctions,
IExecuteSingleFunctions,
IGetExecuteFunctions,
IGetExecuteSingleFunctions,
IHookFunctions,
ILoadOptionsFunctions,
IPollFunctions,
Expand Down Expand Up @@ -2277,7 +2278,7 @@ export function getExecuteTriggerFunctions(
/**
* Returns the execute functions regular nodes have access to.
*/
export function getExecuteFunctions(
export const getExecuteFunctions: IGetExecuteFunctions = (
workflow: Workflow,
runExecutionData: IRunExecutionData,
runIndex: number,
Expand All @@ -2287,9 +2288,13 @@ export function getExecuteFunctions(
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
): IExecuteFunctions {
abortController?: AbortController,
) => {
return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
return {
onExecutionCancellation: (handler) => {
abortController?.signal?.addEventListener('abort', handler);
},
...getCommonWorkflowFunctions(workflow, node, additionalData),
getMode: () => mode,
getCredentials: async (type, itemIndex) =>
Expand Down Expand Up @@ -2456,12 +2461,12 @@ export function getExecuteFunctions(
nodeHelpers: getNodeHelperFunctions(additionalData),
};
})(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions;
}
};

/**
* Returns the execute functions regular nodes have access to when single-function is defined.
*/
export function getExecuteSingleFunctions(
export const getExecuteSingleFunctions: IGetExecuteSingleFunctions = (
workflow: Workflow,
runExecutionData: IRunExecutionData,
runIndex: number,
Expand All @@ -2472,9 +2477,14 @@ export function getExecuteSingleFunctions(
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
): IExecuteSingleFunctions {
abortController?: AbortController,
) => {
return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => {
return {
onExecutionCancellation: (handler) => {
abortController?.signal?.addEventListener('abort', handler);
// TODO: clear out event listeners
},
...getCommonWorkflowFunctions(workflow, node, additionalData),
continueOnFail: () => continueOnFail(node),
evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => {
Expand Down Expand Up @@ -2594,7 +2604,7 @@ export function getExecuteSingleFunctions(
},
};
})(workflow, runExecutionData, connectionInputData, inputData, node, itemIndex);
}
};

export function getCredentialTestFunctions(): ICredentialTestFunctions {
return {
Expand Down
Loading

0 comments on commit 5a0b49d

Please sign in to comment.