Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Use AbortController to notify nodes to abort execution #6141

Merged
merged 17 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions packages/core/src/NodeExecuteFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2508,6 +2508,19 @@ const getCommonWorkflowFunctions = (
prepareOutputData: async (outputData) => [outputData],
});

const executionCancellationFunctions = (
abortSignal?: AbortSignal,
): Pick<IExecuteFunctions, 'onExecutionCancellation' | 'getExecutionCancelSignal'> => ({
getExecutionCancelSignal: () => abortSignal,
onExecutionCancellation: (handler) => {
ivov marked this conversation as resolved.
Show resolved Hide resolved
const fn = () => {
abortSignal?.removeEventListener('abort', fn);
handler();
};
abortSignal?.addEventListener('abort', fn);
},
});

const getRequestHelperFunctions = (
workflow: Workflow,
node: INode,
Expand Down Expand Up @@ -3087,10 +3100,12 @@ export function getExecuteFunctions(
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
getMode: () => mode,
getCredentials: async (type, itemIndex) =>
getCredentials(
Expand Down Expand Up @@ -3512,10 +3527,12 @@ export function getExecuteSingleFunctions(
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteSingleFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => {
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
continueOnFail: () => continueOnFail(node),
evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => {
evaluateItemIndex = evaluateItemIndex === undefined ? itemIndex : evaluateItemIndex;
Expand Down
46 changes: 20 additions & 26 deletions packages/core/src/WorkflowExecute.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
/* eslint-disable @typescript-eslint/prefer-optional-chain */

/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */

/* eslint-disable @typescript-eslint/prefer-nullish-coalescing */
import { setMaxListeners } from 'events';
import PCancelable from 'p-cancelable';

import type {
Expand Down Expand Up @@ -44,23 +43,14 @@ import get from 'lodash/get';
import * as NodeExecuteFunctions from './NodeExecuteFunctions';

export class WorkflowExecute {
runExecutionData: IRunExecutionData;

private additionalData: IWorkflowExecuteAdditionalData;

private mode: WorkflowExecuteMode;
private status: ExecutionStatus = 'new';

private status: ExecutionStatus;
private readonly abortController = new AbortController();

constructor(
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
runExecutionData?: IRunExecutionData,
) {
this.additionalData = additionalData;
this.mode = mode;
this.status = 'new';
this.runExecutionData = runExecutionData || {
private readonly additionalData: IWorkflowExecuteAdditionalData,
private readonly mode: WorkflowExecuteMode,
private runExecutionData: IRunExecutionData = {
startData: {},
resultData: {
runData: {},
Expand All @@ -73,8 +63,8 @@ export class WorkflowExecute {
waitingExecution: {},
waitingExecutionSource: {},
},
};
}
},
) {}

/**
* Executes the given workflow.
Expand Down Expand Up @@ -830,11 +820,16 @@ export class WorkflowExecute {
let closeFunction: Promise<void> | undefined;

return new PCancelable(async (resolve, reject, onCancel) => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once this PR is merged, I'm going to get rid of PCancelable and use a regular promise with AbortController instead.
Even p-cancelable recommends to do that since 3 years.
Moving away from PCancelable should also make this code more readable.

let gotCancel = false;
// Let as many nodes listen to the abort signal, without getting the MaxListenersExceededWarning
setMaxListeners(Infinity, this.abortController.signal);

onCancel.shouldReject = false;
onCancel(() => {
gotCancel = true;
this.status = 'canceled';
this.abortController.abort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we can pass in a reason arg - all cancellations that can happen right now are user-requested? Else we might want to use this arg to differentiate between user-requested cancellations vs. cancellations initiated on our end?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. we can have cancellation from user interaction, timeout, or server shutdown.
I've created a backlog ticket: ENG-92.

const fullRunData = this.getFullRunData(startedAt);
void this.executeHook('workflowExecuteAfter', [fullRunData]);
setTimeout(() => resolve(fullRunData), 10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For understanding - We are resolving this PCancelable inside a timeout in order to allow for other promises to resolve first, i.e. to force this promise to be placed in the macrotask queue, processed after the microtask queue. Is this correct? Else why are we doing this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. we have a bunch of async operations (that don't really take any time, and possibly don't even need to be async) that need to happen between the cancellation, and before this resolve is called. I don't like this, and plan to fix this in the same PR where I'll remove PCancelable completely.

});

const returnPromise = (async () => {
Expand Down Expand Up @@ -881,10 +876,10 @@ export class WorkflowExecute {
this.additionalData.executionTimeoutTimestamp !== undefined &&
Date.now() >= this.additionalData.executionTimeoutTimestamp
) {
gotCancel = true;
this.status = 'canceled';
}

if (gotCancel) {
if (this.status === 'canceled') {
return;
}

Expand Down Expand Up @@ -1014,9 +1009,6 @@ export class WorkflowExecute {
}

for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) {
if (gotCancel) {
return;
}
try {
if (tryIndex !== 0) {
// Reset executionError from previous error try
Expand Down Expand Up @@ -1052,6 +1044,7 @@ export class WorkflowExecute {
this.additionalData,
NodeExecuteFunctions,
this.mode,
this.abortController.signal,
);
nodeSuccessData = runNodeData.data;

Expand Down Expand Up @@ -1089,6 +1082,7 @@ export class WorkflowExecute {
this.additionalData,
executionData,
this.mode,
this.abortController.signal,
);
const dataProxy = executeFunctions.getWorkflowDataProxy(0);

Expand Down Expand Up @@ -1644,7 +1638,7 @@ export class WorkflowExecute {
return;
})()
.then(async () => {
if (gotCancel && executionError === undefined) {
if (this.status === 'canceled' && executionError === undefined) {
return this.processSuccessExecution(
startedAt,
workflow,
Expand Down
20 changes: 16 additions & 4 deletions packages/editor-ui/src/mixins/pushConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ export const pushConnection = defineComponent({
return false;
}

if (this.workflowsStore.activeExecutionId !== pushData.executionId) {
const { activeExecutionId } = this.workflowsStore;
if (activeExecutionId !== pushData.executionId) {
// The workflow which did finish execution did either not get started
// by this session or we do not have the execution id yet.
if (isRetry !== true) {
Expand All @@ -285,10 +286,17 @@ export const pushConnection = defineComponent({

let runDataExecutedErrorMessage = this.getExecutionError(runDataExecuted.data);

if (pushData.data.status === 'crashed') {
if (runDataExecuted.status === 'crashed') {
runDataExecutedErrorMessage = this.$locale.baseText(
'pushConnection.executionFailed.message',
);
} else if (runDataExecuted.status === 'canceled') {
runDataExecutedErrorMessage = this.$locale.baseText(
'executionsList.showMessage.stopExecution.message',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using canceled internally but stopped externally - we might want to unify for consistency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that we should unify these. Created a ticket ENG-93

{
interpolate: { activeExecutionId },
},
);
}

const lineNumber = runDataExecuted?.data?.resultData?.error?.lineNumber;
Expand Down Expand Up @@ -389,7 +397,11 @@ export const pushConnection = defineComponent({
});
} else {
let title: string;
if (runDataExecuted.data.resultData.lastNodeExecuted) {
let type = 'error';
if (runDataExecuted.status === 'canceled') {
title = this.$locale.baseText('nodeView.showMessage.stopExecutionTry.title');
type = 'warning';
} else if (runDataExecuted.data.resultData.lastNodeExecuted) {
title = `Problem in node ‘${runDataExecuted.data.resultData.lastNodeExecuted}‘`;
} else {
title = 'Problem executing workflow';
Expand All @@ -398,7 +410,7 @@ export const pushConnection = defineComponent({
this.showMessage({
title,
message: runDataExecutedErrorMessage,
type: 'error',
type,
duration: 0,
dangerouslyUseHTMLString: true,
});
Expand Down
4 changes: 0 additions & 4 deletions packages/editor-ui/src/views/NodeView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -1562,10 +1562,6 @@ export default defineComponent({
try {
this.stopExecutionInProgress = true;
await this.workflowsStore.stopCurrentExecution(executionId);
this.showMessage({
title: this.$locale.baseText('nodeView.showMessage.stopExecutionTry.title'),
type: 'success',
});
} catch (error) {
// Execution stop might fail when the execution has already finished. Let's treat this here.
const execution = await this.workflowsStore.getExecution(executionId);
Expand Down
7 changes: 3 additions & 4 deletions packages/nodes-base/nodes/Wait/Wait.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,9 @@ export class Wait extends Webhook {
if (waitValue < 65000) {
// If wait time is shorter than 65 seconds leave execution active because
// we just check the database every 60 seconds.
return new Promise((resolve, _reject) => {
setTimeout(() => {
resolve([context.getInputData()]);
}, waitValue);
return new Promise((resolve) => {
const timer = setTimeout(() => resolve([context.getInputData()]), waitValue);
context.onExecutionCancellation(() => clearTimeout(timer));
});
}

Expand Down
4 changes: 4 additions & 0 deletions packages/workflow/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ export interface IGetExecuteFunctions {
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteFunctions;
}

Expand All @@ -437,6 +438,7 @@ export interface IGetExecuteSingleFunctions {
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteSingleFunctions;
}

Expand Down Expand Up @@ -776,6 +778,8 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & {
getExecuteData(): IExecuteData;
getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData;
getInputSourceData(inputIndex?: number, inputName?: string): ISourceData;
getExecutionCancelSignal(): AbortSignal | undefined;
onExecutionCancellation(handler: () => unknown): void;
};

// TODO: Create later own type only for Config-Nodes
Expand Down
3 changes: 3 additions & 0 deletions packages/workflow/src/RoutingNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export class RoutingNode {
executeData: IExecuteData,
nodeExecuteFunctions: INodeExecuteFunctions,
credentialsDecrypted?: ICredentialsDecrypted,
abortSignal?: AbortSignal,
): Promise<INodeExecutionData[][] | null | undefined> {
const items = inputData.main[0] as INodeExecutionData[];
const returnData: INodeExecutionData[] = [];
Expand All @@ -99,6 +100,7 @@ export class RoutingNode {
this.additionalData,
executeData,
this.mode,
abortSignal,
);

let credentials: ICredentialDataDecryptedObject | undefined;
Expand Down Expand Up @@ -136,6 +138,7 @@ export class RoutingNode {
this.additionalData,
executeData,
this.mode,
abortSignal,
);
const requestData: DeclarativeRestApiSettings.ResultOptions = {
options: {
Expand Down
4 changes: 4 additions & 0 deletions packages/workflow/src/Workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,7 @@ export class Workflow {
additionalData: IWorkflowExecuteAdditionalData,
nodeExecuteFunctions: INodeExecuteFunctions,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): Promise<IRunNodeResponse> {
const { node } = executionData;
let inputData = executionData.data;
Expand Down Expand Up @@ -1303,6 +1304,7 @@ export class Workflow {
additionalData,
executionData,
mode,
abortSignal,
);
const data =
nodeType instanceof Node
Expand Down Expand Up @@ -1385,6 +1387,8 @@ export class Workflow {
nodeType,
executionData,
nodeExecuteFunctions,
undefined,
abortSignal,
),
};
}
Expand Down
Loading