From 1d12c6af4428f29241256f3e8de799c64a24d67e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Fri, 28 Apr 2023 16:58:15 +0200 Subject: [PATCH] fix(core): Use AbortController to Notify nodes to abort execution --- packages/cli/src/ActiveExecutions.ts | 14 ++++++++++++++ packages/cli/src/Interfaces.ts | 1 + .../cli/src/WorkflowExecuteAdditionalData.ts | 1 + packages/cli/src/WorkflowRunner.ts | 18 ++++++++++++++++-- packages/cli/src/WorkflowRunnerProcess.ts | 17 ++++++++++++++--- packages/cli/src/commands/worker.ts | 1 + packages/core/src/NodeExecuteFunctions.ts | 10 ++++++++++ packages/core/src/WorkflowExecute.ts | 2 ++ packages/core/tsconfig.json | 1 + packages/workflow/src/Interfaces.ts | 3 +++ packages/workflow/src/RoutingNode.ts | 3 +++ packages/workflow/src/Workflow.ts | 5 +++++ 12 files changed, 71 insertions(+), 5 deletions(-) diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 590cba36cc713..aab3ad64129d7 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -126,6 +126,17 @@ export class ActiveExecutions { this.activeExecutions[executionId].responsePromise = responsePromise; } + attachAbortController(executionId: string, abortController: AbortController): void { + const execution = this.activeExecutions[executionId]; + if (execution === undefined) { + throw new Error( + `No active execution with id "${executionId}" got found to attach to workflowExecution to!`, + ); + } + + execution.abortController = abortController; + } + resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void { if (this.activeExecutions[executionId] === undefined) { return; @@ -180,6 +191,9 @@ export class ActiveExecutions { }, 1); } } else { + // Notify nodes to abort all operations + this.activeExecutions[executionId].abortController?.abort(); + // Workflow is running in current process this.activeExecutions[executionId].workflowExecution!.cancel(); } diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 63f0a6b6beea0..88545ec8dda3c 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -266,6 +266,7 @@ export interface IExecutingWorkflowData { postExecutePromises: Array>; responsePromise?: IDeferredPromise; workflowExecution?: PCancelable; + abortController?: AbortController; status: ExecutionStatus; } diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index f958d3a49508d..7f690d171e139 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -1027,6 +1027,7 @@ async function executeWorkflow( const runExecutionData = runData.executionData as IRunExecutionData; + // TODO: setup abortController // Execute the workflow const workflowExecute = new WorkflowExecute( additionalDataIntegrated, diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index c785a2fa22160..731eefd5828f5 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -329,6 +329,7 @@ export class WorkflowRunner { sessionId: data.sessionId, }); + const abortController = new AbortController(); if (data.executionData !== undefined) { Logger.debug(`Execution ID ${executionId} had Execution data. Running with payload.`, { executionId, @@ -337,6 +338,7 @@ export class WorkflowRunner { additionalData, data.executionMode, data.executionData, + abortController, ); workflowExecution = workflowExecute.processRunExecutionData(workflow); } else if ( @@ -357,7 +359,12 @@ export class WorkflowRunner { } // Can execute without webhook so go on - const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); + const workflowExecute = new WorkflowExecute( + additionalData, + data.executionMode, + undefined, + abortController, + ); workflowExecution = workflowExecute.run( workflow, startNode, @@ -367,7 +374,12 @@ export class WorkflowRunner { } else { Logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId }); // Execute only the nodes between start and destination nodes - const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); + const workflowExecute = new WorkflowExecute( + additionalData, + data.executionMode, + undefined, + abortController, + ); workflowExecution = workflowExecute.runPartialWorkflow( workflow, data.runData, @@ -377,6 +389,7 @@ export class WorkflowRunner { ); } + this.activeExecutions.attachAbortController(executionId, abortController); this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); if (workflowTimeout > 0) { @@ -620,6 +633,7 @@ export class WorkflowRunner { // So we're just preventing crashes here. }); + // TODO: setup AbortController this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); return executionId; } diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index 3b61ed11fab82..eee586ab4e202 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -279,12 +279,13 @@ class WorkflowRunnerProcess { return returnData!.data!.main; }; - + const abortController = new AbortController(); if (this.data.executionData !== undefined) { this.workflowExecute = new WorkflowExecute( additionalData, this.data.executionMode, this.data.executionData, + abortController, ); return this.workflowExecute.processRunExecutionData(this.workflow); } @@ -305,7 +306,12 @@ class WorkflowRunnerProcess { } // Can execute without webhook so go on - this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode); + this.workflowExecute = new WorkflowExecute( + additionalData, + this.data.executionMode, + undefined, + abortController, + ); return this.workflowExecute.run( this.workflow, startNode, @@ -314,7 +320,12 @@ class WorkflowRunnerProcess { ); } // Execute only the nodes between start and destination nodes - this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode); + this.workflowExecute = new WorkflowExecute( + additionalData, + this.data.executionMode, + undefined, + abortController, + ); return this.workflowExecute.runPartialWorkflow( this.workflow, this.data.runData, diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 48d2269d0e171..c17b50aa81e9a 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -191,6 +191,7 @@ export class Worker extends BaseCommand { LoggerProxy.debug(`Queued worker execution status for ${executionId} is "${status}"`); }; + // TODO: setup AbortController let workflowExecute: WorkflowExecute; let workflowRun: PCancelable; if (currentExecutionDb.data !== undefined) { diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 5d86a914a0d08..09555490982f2 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -2287,9 +2287,14 @@ export function getExecuteFunctions( additionalData: IWorkflowExecuteAdditionalData, executeData: IExecuteData, mode: WorkflowExecuteMode, + abortController?: AbortController, ): IExecuteFunctions { return ((workflow, runExecutionData, connectionInputData, inputData, node) => { return { + onExecutionCancellation: (handler) => { + abortController?.signal?.addEventListener('abort', handler); + // TODO: clear out event listeners + }, ...getCommonWorkflowFunctions(workflow, node, additionalData), getMode: () => mode, getCredentials: async (type, itemIndex) => @@ -2472,9 +2477,14 @@ export function getExecuteSingleFunctions( additionalData: IWorkflowExecuteAdditionalData, executeData: IExecuteData, mode: WorkflowExecuteMode, + abortController?: AbortController, ): IExecuteSingleFunctions { return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => { return { + onExecutionCancellation: (handler) => { + abortController?.signal?.addEventListener('abort', handler); + // TODO: clear out event listeners + }, ...getCommonWorkflowFunctions(workflow, node, additionalData), continueOnFail: () => continueOnFail(node), evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => { diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 00e7a3f84f01c..46ea1b67f743a 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -53,6 +53,7 @@ export class WorkflowExecute { additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, runExecutionData?: IRunExecutionData, + private abortController?: AbortController, ) { this.additionalData = additionalData; this.mode = mode; @@ -955,6 +956,7 @@ export class WorkflowExecute { this.additionalData, NodeExecuteFunctions, this.mode, + this.abortController, ); nodeSuccessData = runNodeData.data; diff --git a/packages/core/tsconfig.json b/packages/core/tsconfig.json index 69536da44842a..853c859202fb1 100644 --- a/packages/core/tsconfig.json +++ b/packages/core/tsconfig.json @@ -2,6 +2,7 @@ "extends": "../../tsconfig.json", "compilerOptions": { "rootDir": ".", + "lib": ["dom", "es2020", "es2022.error"], "types": ["node", "jest"], "composite": true, "noEmit": true, diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index aff7f372de289..00eb8679e5049 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -428,6 +428,7 @@ export interface IGetExecuteFunctions { additionalData: IWorkflowExecuteAdditionalData, executeData: IExecuteData, mode: WorkflowExecuteMode, + abortController?: AbortController, ): IExecuteFunctions; } @@ -443,6 +444,7 @@ export interface IGetExecuteSingleFunctions { additionalData: IWorkflowExecuteAdditionalData, executeData: IExecuteData, mode: WorkflowExecuteMode, + abortController?: AbortController, ): IExecuteSingleFunctions; } @@ -744,6 +746,7 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & { getExecuteData(): IExecuteData; getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData; getInputSourceData(inputIndex?: number, inputName?: string): ISourceData; + onExecutionCancellation(handler: () => void): void; }; export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & diff --git a/packages/workflow/src/RoutingNode.ts b/packages/workflow/src/RoutingNode.ts index 727fe5efca208..50cf2e3a760bb 100644 --- a/packages/workflow/src/RoutingNode.ts +++ b/packages/workflow/src/RoutingNode.ts @@ -81,6 +81,7 @@ export class RoutingNode { executeData: IExecuteData, nodeExecuteFunctions: INodeExecuteFunctions, credentialsDecrypted?: ICredentialsDecrypted, + abortController?: AbortController, ): Promise { const items = inputData.main[0] as INodeExecutionData[]; const returnData: INodeExecutionData[] = []; @@ -101,6 +102,7 @@ export class RoutingNode { this.additionalData, executeData, this.mode, + abortController, ); let credentials: ICredentialDataDecryptedObject | undefined; @@ -137,6 +139,7 @@ export class RoutingNode { this.additionalData, executeData, this.mode, + abortController, ); const requestData: DeclarativeRestApiSettings.ResultOptions = { options: { diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index a3befc4580b46..e727ee4c5ea24 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -1144,6 +1144,7 @@ export class Workflow { additionalData: IWorkflowExecuteAdditionalData, nodeExecuteFunctions: INodeExecuteFunctions, mode: WorkflowExecuteMode, + abortController?: AbortController, ): Promise { const { node } = executionData; let inputData = executionData.data; @@ -1227,6 +1228,7 @@ export class Workflow { additionalData, executionData, mode, + abortController, ); returnPromises.push(nodeType.executeSingle.call(thisArgs)); @@ -1251,6 +1253,7 @@ export class Workflow { additionalData, executionData, mode, + abortController, ); return { data: await nodeType.execute.call(thisArgs) }; } else if (nodeType.poll) { @@ -1329,6 +1332,8 @@ export class Workflow { nodeType, executionData, nodeExecuteFunctions, + undefined, + abortController, ), }; }