Skip to content

Commit

Permalink
fix(core): Use AbortController to Notify nodes to abort execution
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Apr 28, 2023
1 parent a72a511 commit 1d12c6a
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 5 deletions.
14 changes: 14 additions & 0 deletions packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ export class ActiveExecutions {
this.activeExecutions[executionId].responsePromise = responsePromise;
}

attachAbortController(executionId: string, abortController: AbortController): void {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
throw new Error(
`No active execution with id "${executionId}" got found to attach to workflowExecution to!`,
);
}

execution.abortController = abortController;
}

resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void {
if (this.activeExecutions[executionId] === undefined) {
return;
Expand Down Expand Up @@ -180,6 +191,9 @@ export class ActiveExecutions {
}, 1);
}
} else {
// Notify nodes to abort all operations
this.activeExecutions[executionId].abortController?.abort();

// Workflow is running in current process
this.activeExecutions[executionId].workflowExecution!.cancel();
}
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ export interface IExecutingWorkflowData {
postExecutePromises: Array<IDeferredPromise<IRun | undefined>>;
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>;
workflowExecution?: PCancelable<IRun>;
abortController?: AbortController;
status: ExecutionStatus;
}

Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ async function executeWorkflow(

const runExecutionData = runData.executionData as IRunExecutionData;

// TODO: setup abortController
// Execute the workflow
const workflowExecute = new WorkflowExecute(
additionalDataIntegrated,
Expand Down
18 changes: 16 additions & 2 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ export class WorkflowRunner {
sessionId: data.sessionId,
});

const abortController = new AbortController();
if (data.executionData !== undefined) {
Logger.debug(`Execution ID ${executionId} had Execution data. Running with payload.`, {
executionId,
Expand All @@ -337,6 +338,7 @@ export class WorkflowRunner {
additionalData,
data.executionMode,
data.executionData,
abortController,
);
workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else if (
Expand All @@ -357,7 +359,12 @@ export class WorkflowRunner {
}

// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
const workflowExecute = new WorkflowExecute(
additionalData,
data.executionMode,
undefined,
abortController,
);
workflowExecution = workflowExecute.run(
workflow,
startNode,
Expand All @@ -367,7 +374,12 @@ export class WorkflowRunner {
} else {
Logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId });
// Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
const workflowExecute = new WorkflowExecute(
additionalData,
data.executionMode,
undefined,
abortController,
);
workflowExecution = workflowExecute.runPartialWorkflow(
workflow,
data.runData,
Expand All @@ -377,6 +389,7 @@ export class WorkflowRunner {
);
}

this.activeExecutions.attachAbortController(executionId, abortController);
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);

if (workflowTimeout > 0) {
Expand Down Expand Up @@ -620,6 +633,7 @@ export class WorkflowRunner {
// So we're just preventing crashes here.
});

// TODO: setup AbortController
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
return executionId;
}
Expand Down
17 changes: 14 additions & 3 deletions packages/cli/src/WorkflowRunnerProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,13 @@ class WorkflowRunnerProcess {

return returnData!.data!.main;
};

const abortController = new AbortController();
if (this.data.executionData !== undefined) {
this.workflowExecute = new WorkflowExecute(
additionalData,
this.data.executionMode,
this.data.executionData,
abortController,
);
return this.workflowExecute.processRunExecutionData(this.workflow);
}
Expand All @@ -305,7 +306,12 @@ class WorkflowRunnerProcess {
}

// Can execute without webhook so go on
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode);
this.workflowExecute = new WorkflowExecute(
additionalData,
this.data.executionMode,
undefined,
abortController,
);
return this.workflowExecute.run(
this.workflow,
startNode,
Expand All @@ -314,7 +320,12 @@ class WorkflowRunnerProcess {
);
}
// Execute only the nodes between start and destination nodes
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode);
this.workflowExecute = new WorkflowExecute(
additionalData,
this.data.executionMode,
undefined,
abortController,
);
return this.workflowExecute.runPartialWorkflow(
this.workflow,
this.data.runData,
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ export class Worker extends BaseCommand {
LoggerProxy.debug(`Queued worker execution status for ${executionId} is "${status}"`);
};

// TODO: setup AbortController
let workflowExecute: WorkflowExecute;
let workflowRun: PCancelable<IRun>;
if (currentExecutionDb.data !== undefined) {
Expand Down
10 changes: 10 additions & 0 deletions packages/core/src/NodeExecuteFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2287,9 +2287,14 @@ export function getExecuteFunctions(
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortController?: AbortController,
): IExecuteFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
return {
onExecutionCancellation: (handler) => {
abortController?.signal?.addEventListener('abort', handler);
// TODO: clear out event listeners
},
...getCommonWorkflowFunctions(workflow, node, additionalData),
getMode: () => mode,
getCredentials: async (type, itemIndex) =>
Expand Down Expand Up @@ -2472,9 +2477,14 @@ export function getExecuteSingleFunctions(
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortController?: AbortController,
): IExecuteSingleFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => {
return {
onExecutionCancellation: (handler) => {
abortController?.signal?.addEventListener('abort', handler);
// TODO: clear out event listeners
},
...getCommonWorkflowFunctions(workflow, node, additionalData),
continueOnFail: () => continueOnFail(node),
evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/WorkflowExecute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export class WorkflowExecute {
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
runExecutionData?: IRunExecutionData,
private abortController?: AbortController,
) {
this.additionalData = additionalData;
this.mode = mode;
Expand Down Expand Up @@ -955,6 +956,7 @@ export class WorkflowExecute {
this.additionalData,
NodeExecuteFunctions,
this.mode,
this.abortController,
);
nodeSuccessData = runNodeData.data;

Expand Down
1 change: 1 addition & 0 deletions packages/core/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": ".",
"lib": ["dom", "es2020", "es2022.error"],
"types": ["node", "jest"],
"composite": true,
"noEmit": true,
Expand Down
3 changes: 3 additions & 0 deletions packages/workflow/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ export interface IGetExecuteFunctions {
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortController?: AbortController,
): IExecuteFunctions;
}

Expand All @@ -443,6 +444,7 @@ export interface IGetExecuteSingleFunctions {
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortController?: AbortController,
): IExecuteSingleFunctions;
}

Expand Down Expand Up @@ -744,6 +746,7 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & {
getExecuteData(): IExecuteData;
getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData;
getInputSourceData(inputIndex?: number, inputName?: string): ISourceData;
onExecutionCancellation(handler: () => void): void;
};

export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
Expand Down
3 changes: 3 additions & 0 deletions packages/workflow/src/RoutingNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export class RoutingNode {
executeData: IExecuteData,
nodeExecuteFunctions: INodeExecuteFunctions,
credentialsDecrypted?: ICredentialsDecrypted,
abortController?: AbortController,
): Promise<INodeExecutionData[][] | null | undefined> {
const items = inputData.main[0] as INodeExecutionData[];
const returnData: INodeExecutionData[] = [];
Expand All @@ -101,6 +102,7 @@ export class RoutingNode {
this.additionalData,
executeData,
this.mode,
abortController,
);

let credentials: ICredentialDataDecryptedObject | undefined;
Expand Down Expand Up @@ -137,6 +139,7 @@ export class RoutingNode {
this.additionalData,
executeData,
this.mode,
abortController,
);
const requestData: DeclarativeRestApiSettings.ResultOptions = {
options: {
Expand Down
5 changes: 5 additions & 0 deletions packages/workflow/src/Workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,7 @@ export class Workflow {
additionalData: IWorkflowExecuteAdditionalData,
nodeExecuteFunctions: INodeExecuteFunctions,
mode: WorkflowExecuteMode,
abortController?: AbortController,
): Promise<IRunNodeResponse> {
const { node } = executionData;
let inputData = executionData.data;
Expand Down Expand Up @@ -1227,6 +1228,7 @@ export class Workflow {
additionalData,
executionData,
mode,
abortController,
);

returnPromises.push(nodeType.executeSingle.call(thisArgs));
Expand All @@ -1251,6 +1253,7 @@ export class Workflow {
additionalData,
executionData,
mode,
abortController,
);
return { data: await nodeType.execute.call(thisArgs) };
} else if (nodeType.poll) {
Expand Down Expand Up @@ -1329,6 +1332,8 @@ export class Workflow {
nodeType,
executionData,
nodeExecuteFunctions,
undefined,
abortController,
),
};
}
Expand Down

0 comments on commit 1d12c6a

Please sign in to comment.