From 3a3527a44233981f865c9eb508c73703451f6789 Mon Sep 17 00:00:00 2001 From: Danny Martini Date: Mon, 12 Feb 2024 11:36:34 +0100 Subject: [PATCH] Revert "feat(core): Remove `own` execution-process mode (#8490)" This reverts commit 121a55b691469e7eb042737573c0ace276366ecb. --- packages/cli/src/ActiveExecutions.ts | 30 +- packages/cli/src/Interfaces.ts | 12 + packages/cli/src/InternalHooks.ts | 1 + packages/cli/src/WorkflowRunner.ts | 213 +++++++- packages/cli/src/WorkflowRunnerProcess.ts | 506 ++++++++++++++++++ packages/cli/src/commands/BaseCommand.ts | 4 +- packages/cli/src/config/index.ts | 1 + packages/cli/src/config/schema.ts | 9 + .../cli/test/unit/ActiveExecutions.test.ts | 11 +- packages/core/src/Interfaces.ts | 6 + 10 files changed, 781 insertions(+), 12 deletions(-) create mode 100644 packages/cli/src/WorkflowRunnerProcess.ts diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index fdb97b6867e9c..0ca3dbe63e92f 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -1,4 +1,5 @@ import { Service } from 'typedi'; +import type { ChildProcess } from 'child_process'; import type PCancelable from 'p-cancelable'; import type { IDeferredPromise, @@ -33,7 +34,11 @@ export class ActiveExecutions { /** * Add a new active execution */ - async add(executionData: IWorkflowExecutionDataProcess, executionId?: string): Promise { + async add( + executionData: IWorkflowExecutionDataProcess, + process?: ChildProcess, + executionId?: string, + ): Promise { let executionStatus: ExecutionStatus = executionId ? 'running' : 'new'; if (executionId === undefined) { // Is a new execution so save in DB @@ -77,6 +82,7 @@ export class ActiveExecutions { this.activeExecutions[executionId] = { executionData, + process, startedAt: new Date(), postExecutePromises: [], status: executionStatus, @@ -129,15 +135,33 @@ export class ActiveExecutions { /** * Forces an execution to stop + * + * @param {string} executionId The id of the execution to stop + * @param {string} timeout String 'timeout' given if stop due to timeout */ - async stopExecution(executionId: string): Promise { + async stopExecution(executionId: string, timeout?: string): Promise { const execution = this.activeExecutions[executionId]; if (execution === undefined) { // There is no execution running with that id return; } - execution.workflowExecution!.cancel(); + // In case something goes wrong make sure that promise gets first + // returned that it gets then also resolved correctly. + if (execution.process !== undefined) { + // Workflow is running in subprocess + if (execution.process.connected) { + setTimeout(() => { + // execute on next event loop tick; + execution.process!.send({ + type: timeout || 'stopExecution', + }); + }, 1); + } + } else { + // Workflow is running in current process + execution.workflowExecution!.cancel(); + } return await this.getPostExecutePromise(executionId); } diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index abcfa0bccb5a4..30bbba4260428 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -31,6 +31,7 @@ import type { WorkflowExecute } from 'n8n-core'; import type PCancelable from 'p-cancelable'; +import type { ChildProcess } from 'child_process'; import type { AuthProviderType } from '@db/entities/AuthIdentity'; import type { SharedCredentials } from '@db/entities/SharedCredentials'; import type { TagEntity } from '@db/entities/TagEntity'; @@ -189,6 +190,7 @@ export interface IExecutionsCurrentSummary { export interface IExecutingWorkflowData { executionData: IWorkflowExecutionDataProcess; + process?: ChildProcess; startedAt: Date; postExecutePromises: Array>; responsePromise?: IDeferredPromise; @@ -523,6 +525,11 @@ export interface IWorkflowErrorData { }; } +export interface IProcessMessageDataHook { + hook: string; + parameters: any[]; +} + export interface IWorkflowExecutionDataProcess { destinationNode?: string; restartExecutionId?: string; @@ -537,6 +544,11 @@ export interface IWorkflowExecutionDataProcess { userId: string; } +export interface IWorkflowExecutionDataProcessWithExecution extends IWorkflowExecutionDataProcess { + executionId: string; + userId: string; +} + export interface IWorkflowExecuteProcess { startedAt: Date; workflow: Workflow; diff --git a/packages/cli/src/InternalHooks.ts b/packages/cli/src/InternalHooks.ts index e5bc7c21e295f..5b609f1ad2529 100644 --- a/packages/cli/src/InternalHooks.ts +++ b/packages/cli/src/InternalHooks.ts @@ -106,6 +106,7 @@ export class InternalHooks { }, }, execution_variables: { + executions_process: config.getEnv('executions.process'), executions_mode: config.getEnv('executions.mode'), executions_timeout: config.getEnv('executions.timeout'), executions_timeout_max: config.getEnv('executions.maxTimeout'), diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index fd722a730a99b..8bb09fc73be50 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -3,6 +3,7 @@ /* eslint-disable @typescript-eslint/no-shadow */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ import { Container, Service } from 'typedi'; +import type { IProcessMessage } from 'n8n-core'; import { WorkflowExecute } from 'n8n-core'; import type { @@ -21,6 +22,8 @@ import { } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; +import { join as pathJoin } from 'path'; +import { fork } from 'child_process'; import { ActiveExecutions } from '@/ActiveExecutions'; import config from '@/config'; @@ -28,10 +31,16 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service'; import { ExternalHooks } from '@/ExternalHooks'; -import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces'; +import type { + IExecutionResponse, + IProcessMessageDataHook, + IWorkflowExecutionDataProcess, + IWorkflowExecutionDataProcessWithExecution, +} from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; import type { Job, JobData, JobResponse } from '@/Queue'; import { Queue } from '@/Queue'; +import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse'; import * as WorkflowHelpers from '@/WorkflowHelpers'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; @@ -46,6 +55,8 @@ export class WorkflowRunner { private executionsMode = config.getEnv('executions.mode'); + private executionsProcess = config.getEnv('executions.process'); + constructor( private readonly logger: Logger, private readonly activeExecutions: ActiveExecutions, @@ -60,6 +71,14 @@ export class WorkflowRunner { } } + /** The process did send a hook message so execute the appropriate hook */ + private async processHookMessage( + workflowHooks: WorkflowHooks, + hookData: IProcessMessageDataHook, + ) { + await workflowHooks.executeHookFunctions(hookData.hook, hookData.parameters); + } + /** The process did error */ async processError( error: ExecutionError, @@ -167,7 +186,11 @@ export class WorkflowRunner { // frontend would not be possible await this.enqueueExecution(executionId, data, loadStaticData, realtime); } else { - await this.runMainProcess(executionId, data, loadStaticData, executionId); + if (this.executionsProcess === 'main') { + await this.runMainProcess(executionId, data, loadStaticData, executionId); + } else { + executionId = await this.runSubprocess(data, loadStaticData, executionId, responsePromise); + } void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data); } @@ -352,7 +375,7 @@ export class WorkflowRunner { if (workflowTimeout > 0) { const timeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as seconds executionTimeout = setTimeout(() => { - void this.activeExecutions.stopExecution(executionId); + void this.activeExecutions.stopExecution(executionId, 'timeout'); }, timeout); } @@ -586,4 +609,188 @@ export class WorkflowRunner { this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); } + + /** Run the workflow in a child-process */ + private async runSubprocess( + data: IWorkflowExecutionDataProcess, + loadStaticData?: boolean, + restartExecutionId?: string, + responsePromise?: IDeferredPromise, + ): Promise { + const workflowId = data.workflowData.id; + let startedAt = new Date(); + const subprocess = fork(pathJoin(__dirname, 'WorkflowRunnerProcess.js')); + + if (loadStaticData === true && workflowId) { + data.workflowData.staticData = + await this.workflowStaticDataService.getStaticDataById(workflowId); + } + + data.restartExecutionId = restartExecutionId; + + // Register the active execution + const executionId = await this.activeExecutions.add(data, subprocess, restartExecutionId); + + (data as unknown as IWorkflowExecutionDataProcessWithExecution).executionId = executionId; + await this.executionRepository.updateStatus(executionId, 'running'); + + const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId); + + try { + // Send all data to subprocess it needs to run the workflow + subprocess.send({ type: 'startWorkflow', data } as IProcessMessage); + } catch (error) { + await this.processError(error, new Date(), data.executionMode, executionId, workflowHooks); + return executionId; + } + + // Start timeout for the execution + let executionTimeout: NodeJS.Timeout; + + const workflowSettings = data.workflowData.settings ?? {}; + let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); // initialize with default + + const processTimeoutFunction = (timeout: number) => { + void this.activeExecutions.stopExecution(executionId, 'timeout'); + executionTimeout = setTimeout(() => subprocess.kill(), Math.max(timeout * 0.2, 5000)); // minimum 5 seconds + }; + + if (workflowTimeout > 0) { + workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as seconds + // Start timeout already now but give process at least 5 seconds to start. + // Without it could would it be possible that the workflow executions times out before it even got started if + // the timeout time is very short as the process start time can be quite long. + executionTimeout = setTimeout( + processTimeoutFunction, + Math.max(5000, workflowTimeout), + workflowTimeout, + ); + } + + // Create a list of child spawned executions + // If after the child process exits we have + // outstanding executions, we remove them + const childExecutionIds: string[] = []; + + // Listen to data from the subprocess + subprocess.on('message', async (message: IProcessMessage) => { + this.logger.debug( + `Received child process message of type ${message.type} for execution ID ${executionId}.`, + { executionId }, + ); + if (message.type === 'start') { + // Now that the execution actually started set the timeout again so that does not time out to early. + startedAt = new Date(); + if (workflowTimeout > 0) { + clearTimeout(executionTimeout); + executionTimeout = setTimeout(processTimeoutFunction, workflowTimeout, workflowTimeout); + } + } else if (message.type === 'end') { + clearTimeout(executionTimeout); + this.activeExecutions.remove(executionId, message.data.runData); + } else if (message.type === 'sendResponse') { + if (responsePromise) { + responsePromise.resolve(decodeWebhookResponse(message.data.response)); + } + } else if (message.type === 'sendDataToUI') { + // eslint-disable-next-line @typescript-eslint/no-unsafe-call + WorkflowExecuteAdditionalData.sendDataToUI.bind({ sessionId: data.sessionId })( + message.data.type, + message.data.data, + ); + } else if (message.type === 'processError') { + clearTimeout(executionTimeout); + const executionError = message.data.executionError as ExecutionError; + await this.processError( + executionError, + startedAt, + data.executionMode, + executionId, + workflowHooks, + ); + } else if (message.type === 'processHook') { + await this.processHookMessage(workflowHooks, message.data as IProcessMessageDataHook); + } else if (message.type === 'timeout') { + // Execution timed out and its process has been terminated + const timeoutError = new WorkflowOperationError('Workflow execution timed out!'); + + // No need to add hook here as the subprocess takes care of calling the hooks + await this.processError(timeoutError, startedAt, data.executionMode, executionId); + } else if (message.type === 'startExecution') { + const executionId = await this.activeExecutions.add(message.data.runData); + childExecutionIds.push(executionId); + subprocess.send({ type: 'executionId', data: { executionId } } as IProcessMessage); + } else if (message.type === 'finishExecution') { + const executionIdIndex = childExecutionIds.indexOf(message.data.executionId); + if (executionIdIndex !== -1) { + childExecutionIds.splice(executionIdIndex, 1); + } + + if (message.data.result === undefined) { + const noDataError = new WorkflowOperationError('Workflow finished with no result data'); + const subWorkflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain( + data, + message.data.executionId, + ); + await this.processError( + noDataError, + startedAt, + data.executionMode, + message.data?.executionId, + subWorkflowHooks, + ); + } else { + this.activeExecutions.remove(message.data.executionId, message.data.result); + } + } + }); + + // Also get informed when the processes does exit especially when it did crash or timed out + subprocess.on('exit', async (code, signal) => { + if (signal === 'SIGTERM') { + this.logger.debug(`Subprocess for execution ID ${executionId} timed out.`, { executionId }); + // Execution timed out and its process has been terminated + const timeoutError = new WorkflowOperationError('Workflow execution timed out!'); + + await this.processError( + timeoutError, + startedAt, + data.executionMode, + executionId, + workflowHooks, + ); + } else if (code !== 0) { + this.logger.debug( + `Subprocess for execution ID ${executionId} finished with error code ${code}.`, + { executionId }, + ); + // Process did exit with error code, so something went wrong. + const executionError = new WorkflowOperationError( + 'Workflow execution process crashed for an unknown reason!', + ); + + await this.processError( + executionError, + startedAt, + data.executionMode, + executionId, + workflowHooks, + ); + } + + for (const executionId of childExecutionIds) { + // When the child process exits, if we still have + // pending child executions, we mark them as finished + // They will display as unknown to the user + // Instead of pending forever as executing when it + // actually isn't anymore. + + this.activeExecutions.remove(executionId); + } + + clearTimeout(executionTimeout); + }); + + return executionId; + } } diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts new file mode 100644 index 0000000000000..a4c1e9df83397 --- /dev/null +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -0,0 +1,506 @@ +/* eslint-disable @typescript-eslint/no-unsafe-argument */ +/* eslint-disable @typescript-eslint/no-unsafe-member-access */ +/* eslint-disable @typescript-eslint/no-unsafe-assignment */ +/* eslint-disable @typescript-eslint/no-shadow */ +/* eslint-disable @typescript-eslint/no-use-before-define */ +/* eslint-disable @typescript-eslint/unbound-method */ +import 'source-map-support/register'; +import 'reflect-metadata'; +import { setDefaultResultOrder } from 'dns'; + +import { Container } from 'typedi'; +import type { IProcessMessage } from 'n8n-core'; +import { BinaryDataService, WorkflowExecute } from 'n8n-core'; + +import type { + ExecutionError, + IDataObject, + IExecuteResponsePromiseData, + IExecuteWorkflowInfo, + INode, + INodeExecutionData, + IRun, + ITaskData, + IWorkflowExecuteAdditionalData, + IWorkflowExecuteHooks, + IWorkflowSettings, + NodeOperationError, + WorkflowExecuteMode, +} from 'n8n-workflow'; +import { + ErrorReporterProxy as ErrorReporter, + Workflow, + WorkflowHooks, + WorkflowOperationError, +} from 'n8n-workflow'; +import * as Db from '@/Db'; +import { ExternalHooks } from '@/ExternalHooks'; +import type { + IWorkflowExecuteProcess, + IWorkflowExecutionDataProcessWithExecution, +} from '@/Interfaces'; +import { NodeTypes } from '@/NodeTypes'; +import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; +import * as WebhookHelpers from '@/WebhookHelpers'; +import * as WorkflowHelpers from '@/WorkflowHelpers'; +import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; +import { Logger } from '@/Logger'; + +import config from '@/config'; +import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; +import { initErrorHandling } from '@/ErrorReporting'; +import { PermissionChecker } from '@/UserManagement/PermissionChecker'; +import { License } from '@/License'; +import { InternalHooks } from '@/InternalHooks'; +import { PostHogClient } from '@/posthog'; + +if (process.env.NODEJS_PREFER_IPV4 === 'true') { + setDefaultResultOrder('ipv4first'); +} + +class WorkflowRunnerProcess { + data: IWorkflowExecutionDataProcessWithExecution | undefined; + + logger: Logger; + + startedAt = new Date(); + + workflow: Workflow | undefined; + + workflowExecute: WorkflowExecute | undefined; + + // eslint-disable-next-line @typescript-eslint/no-invalid-void-type + executionIdCallback: (executionId: string) => void | undefined; + + childExecutions: { + [key: string]: IWorkflowExecuteProcess; + } = {}; + + static async stopProcess() { + setTimeout(() => { + // Attempt a graceful shutdown, giving executions 30 seconds to finish + process.exit(0); + }, 30000); + } + + constructor() { + this.logger = Container.get(Logger); + } + + async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise { + process.once('SIGTERM', WorkflowRunnerProcess.stopProcess); + process.once('SIGINT', WorkflowRunnerProcess.stopProcess); + + await initErrorHandling(); + + this.data = inputData; + const { userId } = inputData; + + this.logger.verbose('Initializing n8n sub-process', { + pid: process.pid, + workflowId: this.data.workflowData.id, + }); + + this.startedAt = new Date(); + + // Init db since we need to read the license. + await Db.init(); + + const nodeTypes = Container.get(NodeTypes); + await Container.get(LoadNodesAndCredentials).init(); + + // Load all external hooks + const externalHooks = Container.get(ExternalHooks); + await externalHooks.init(); + + await Container.get(PostHogClient).init(); + await Container.get(InternalHooks).init(); + + const binaryDataConfig = config.getEnv('binaryDataManager'); + await Container.get(BinaryDataService).init(binaryDataConfig); + + const license = Container.get(License); + await license.init(); + + const workflowSettings = this.data.workflowData.settings ?? {}; + + // Start timeout for the execution + let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); // initialize with default + if (workflowTimeout > 0) { + workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')); + } + + this.workflow = new Workflow({ + id: this.data.workflowData.id, + name: this.data.workflowData.name, + nodes: this.data.workflowData.nodes, + connections: this.data.workflowData.connections, + active: this.data.workflowData.active, + nodeTypes, + staticData: this.data.workflowData.staticData, + settings: this.data.workflowData.settings, + pinData: this.data.pinData, + }); + try { + await Container.get(PermissionChecker).check(this.workflow, userId); + } catch (error) { + const caughtError = error as NodeOperationError; + const failedExecutionData = generateFailedExecutionFromError( + this.data.executionMode, + caughtError, + caughtError.node, + ); + + // Force the `workflowExecuteAfter` hook to run since + // it's the one responsible for saving the execution + await this.sendHookToParentProcess('workflowExecuteAfter', [failedExecutionData]); + // Interrupt the workflow execution since we don't have all necessary creds. + return failedExecutionData; + } + const additionalData = await WorkflowExecuteAdditionalData.getBase( + userId, + undefined, + workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000, + ); + additionalData.restartExecutionId = this.data.restartExecutionId; + additionalData.hooks = this.getProcessForwardHooks(); + + additionalData.hooks.hookFunctions.sendResponse = [ + async (response: IExecuteResponsePromiseData): Promise => { + await sendToParentProcess('sendResponse', { + response: WebhookHelpers.encodeWebhookResponse(response), + }); + }, + ]; + + additionalData.executionId = inputData.executionId; + + additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({ + executionId: inputData.executionId, + }); + + additionalData.sendDataToUI = async (type: string, data: IDataObject | IDataObject[]) => { + if (workflowRunner.data!.executionMode !== 'manual') { + return; + } + + try { + await sendToParentProcess('sendDataToUI', { type, data }); + } catch (error) { + ErrorReporter.error(error); + this.logger.error( + `There was a problem sending UI data to parent process: "${error.message}"`, + ); + } + }; + const executeWorkflowFunction = additionalData.executeWorkflow; + additionalData.executeWorkflow = async ( + workflowInfo: IExecuteWorkflowInfo, + additionalData: IWorkflowExecuteAdditionalData, + options: { + parentWorkflowId: string; + inputData?: INodeExecutionData[]; + parentWorkflowSettings?: IWorkflowSettings; + }, + ): Promise | IRun> => { + const workflowData = await WorkflowExecuteAdditionalData.getWorkflowData( + workflowInfo, + options.parentWorkflowId, + options.parentWorkflowSettings, + ); + const runData = await WorkflowExecuteAdditionalData.getRunData( + workflowData, + additionalData.userId, + options?.inputData, + ); + await sendToParentProcess('startExecution', { runData }); + const executionId: string = await new Promise((resolve) => { + this.executionIdCallback = (executionId: string) => { + resolve(executionId); + }; + }); + + void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId || '', runData); + + let result: IRun; + try { + const executeWorkflowFunctionOutput = (await executeWorkflowFunction( + workflowInfo, + additionalData, + { + parentWorkflowId: options?.parentWorkflowId, + inputData: options?.inputData, + parentExecutionId: executionId, + loadedWorkflowData: workflowData, + loadedRunData: runData, + parentWorkflowSettings: options?.parentWorkflowSettings, + }, + )) as { workflowExecute: WorkflowExecute; workflow: Workflow } as IWorkflowExecuteProcess; + const { workflowExecute } = executeWorkflowFunctionOutput; + this.childExecutions[executionId] = executeWorkflowFunctionOutput; + const { workflow } = executeWorkflowFunctionOutput; + result = await workflowExecute.processRunExecutionData(workflow); + await externalHooks.run('workflow.postExecute', [result, workflowData, executionId]); + void Container.get(InternalHooks).onWorkflowPostExecute( + executionId, + workflowData, + result, + additionalData.userId, + ); + await sendToParentProcess('finishExecution', { executionId, result }); + delete this.childExecutions[executionId]; + } catch (e) { + await sendToParentProcess('finishExecution', { executionId }); + delete this.childExecutions[executionId]; + // Throw same error we had + throw e; + } + + await sendToParentProcess('finishExecution', { executionId, result }); + + const returnData = WorkflowHelpers.getDataLastExecutedNodeData(result); + + if (returnData!.error) { + const error = new Error(returnData!.error.message); + error.stack = returnData!.error.stack; + throw error; + } + + return returnData!.data!.main; + }; + + if (this.data.executionData !== undefined) { + this.workflowExecute = new WorkflowExecute( + additionalData, + this.data.executionMode, + this.data.executionData, + ); + return await this.workflowExecute.processRunExecutionData(this.workflow); + } + if ( + this.data.runData === undefined || + this.data.startNodes === undefined || + this.data.startNodes.length === 0 + ) { + // Execute all nodes + + const startNode = WorkflowHelpers.getExecutionStartNode(this.data, this.workflow); + + // Can execute without webhook so go on + this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode); + return await this.workflowExecute.run( + this.workflow, + startNode, + this.data.destinationNode, + this.data.pinData, + ); + } + // Execute only the nodes between start and destination nodes + this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode); + return await this.workflowExecute.runPartialWorkflow( + this.workflow, + this.data.runData, + this.data.startNodes, + this.data.destinationNode, + this.data.pinData, + ); + } + + /** + * Sends hook data to the parent process that it executes them + */ + async sendHookToParentProcess(hook: string, parameters: any[]) { + try { + await sendToParentProcess('processHook', { + hook, + parameters, + }); + } catch (error) { + ErrorReporter.error(error); + this.logger.error(`There was a problem sending hook: "${hook}"`, { parameters, error }); + } + } + + /** + * Create a wrapper for hooks which simply forwards the data to + * the parent process where they then can be executed with access + * to database and to PushService + * + */ + getProcessForwardHooks(): WorkflowHooks { + const hookFunctions: IWorkflowExecuteHooks = { + nodeExecuteBefore: [ + async (nodeName: string): Promise => { + await this.sendHookToParentProcess('nodeExecuteBefore', [nodeName]); + }, + ], + nodeExecuteAfter: [ + async (nodeName: string, data: ITaskData): Promise => { + await this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data]); + }, + ], + workflowExecuteBefore: [ + async (): Promise => { + await this.sendHookToParentProcess('workflowExecuteBefore', []); + }, + ], + workflowExecuteAfter: [ + async (fullRunData: IRun, newStaticData?: IDataObject): Promise => { + await this.sendHookToParentProcess('workflowExecuteAfter', [fullRunData, newStaticData]); + }, + ], + nodeFetchedData: [ + async (workflowId: string, node: INode) => { + await this.sendHookToParentProcess('nodeFetchedData', [workflowId, node]); + }, + ], + }; + + const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute(); + + for (const key of Object.keys(preExecuteFunctions)) { + if (hookFunctions[key] === undefined) { + hookFunctions[key] = []; + } + hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); + } + + return new WorkflowHooks( + hookFunctions, + this.data!.executionMode, + this.data!.executionId, + this.data!.workflowData, + { sessionId: this.data!.sessionId, retryOf: this.data!.retryOf as string }, + ); + } +} + +/** + * Sends data to parent process + * + * @param {string} type The type of data to send + * @param {*} data The data + */ +async function sendToParentProcess(type: string, data: any): Promise { + return await new Promise((resolve, reject) => { + process.send!( + { + type, + data, + }, + (error: Error) => { + if (error) { + return reject(error); + } + + resolve(); + }, + ); + }); +} + +const workflowRunner = new WorkflowRunnerProcess(); + +// Listen to messages from parent process which send the data of +// the workflow to process +process.on('message', async (message: IProcessMessage) => { + try { + if (message.type === 'startWorkflow') { + await sendToParentProcess('start', {}); + + const runData = await workflowRunner.runWorkflow(message.data); + + await sendToParentProcess('end', { + runData, + }); + + // Once the workflow got executed make sure the process gets killed again + process.exit(); + } else if (message.type === 'stopExecution' || message.type === 'timeout') { + // The workflow execution should be stopped + let runData: IRun; + + if (workflowRunner.workflowExecute !== undefined) { + const executionIds = Object.keys(workflowRunner.childExecutions); + + for (const executionId of executionIds) { + const childWorkflowExecute = workflowRunner.childExecutions[executionId]; + runData = childWorkflowExecute.workflowExecute.getFullRunData( + workflowRunner.childExecutions[executionId].startedAt, + ); + const timeOutError = + message.type === 'timeout' + ? new WorkflowOperationError('Workflow execution timed out!') + : new WorkflowOperationError('Workflow-Execution has been canceled!'); + + // If there is any data send it to parent process, if execution timedout add the error + + await childWorkflowExecute.workflowExecute.processSuccessExecution( + workflowRunner.childExecutions[executionId].startedAt, + childWorkflowExecute.workflow, + timeOutError, + ); + } + + // Workflow started already executing + runData = workflowRunner.workflowExecute.getFullRunData(workflowRunner.startedAt); + + const timeOutError = + message.type === 'timeout' + ? new WorkflowOperationError('Workflow execution timed out!') + : new WorkflowOperationError('Workflow-Execution has been canceled!'); + + runData.status = message.type === 'timeout' ? 'failed' : 'canceled'; + + // If there is any data send it to parent process, if execution timedout add the error + await workflowRunner.workflowExecute.processSuccessExecution( + workflowRunner.startedAt, + workflowRunner.workflow!, + timeOutError, + ); + } else { + // Workflow did not get started yet + runData = { + data: { + resultData: { + runData: {}, + }, + }, + finished: false, + mode: workflowRunner.data + ? workflowRunner.data.executionMode + : ('own' as WorkflowExecuteMode), + startedAt: workflowRunner.startedAt, + stoppedAt: new Date(), + status: 'canceled', + }; + + await workflowRunner.sendHookToParentProcess('workflowExecuteAfter', [runData]); + } + + await sendToParentProcess(message.type === 'timeout' ? message.type : 'end', { + runData, + }); + + // Stop process + process.exit(); + } else if (message.type === 'executionId') { + workflowRunner.executionIdCallback(message.data.executionId); + } + } catch (error) { + workflowRunner.logger.error(error.message); + + // Catch all uncaught errors and forward them to parent process + const executionError = { + ...error, + name: error.name || 'Error', + message: error.message, + stack: error.stack, + } as ExecutionError; + + await sendToParentProcess('processError', { + executionError, + }); + process.exit(); + } +}); diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 5cd5b0cbbfe02..8bfb5c9c197f9 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -84,8 +84,8 @@ export abstract class BaseCommand extends Command { ); } if (process.env.EXECUTIONS_PROCESS === 'own') { - throw new ApplicationError( - 'Own mode has been removed. If you need the isolation and performance gains, please consider using queue mode.', + this.logger.warn( + 'Own mode has been deprecated and will be removed in a future version of n8n. If you need the isolation and performance gains, please consider using queue mode.', ); } diff --git a/packages/cli/src/config/index.ts b/packages/cli/src/config/index.ts index 5cc2d2a0c1829..7300b0dcf739f 100644 --- a/packages/cli/src/config/index.ts +++ b/packages/cli/src/config/index.ts @@ -6,6 +6,7 @@ import { inTest, inE2ETests } from '@/constants'; if (inE2ETests) { // Skip loading config from env variables in end-to-end tests + process.env.EXECUTIONS_PROCESS = 'main'; process.env.N8N_DIAGNOSTICS_ENABLED = 'false'; process.env.N8N_PUBLIC_API_DISABLED = 'true'; process.env.EXTERNAL_FRONTEND_HOOKS_URLS = ''; diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 4402f82838d45..56728aecb98cb 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -234,6 +234,15 @@ export const schema = { }, executions: { + // By default workflows get always executed in the main process. + // TODO: remove this and all usage of `executions.process` when `own` mode is deleted + process: { + doc: 'In what process workflows should be executed.', + format: ['main', 'own'] as const, + default: 'main', + env: 'EXECUTIONS_PROCESS', + }, + mode: { doc: 'If it should run executions directly or via queue', format: ['regular', 'queue'] as const, diff --git a/packages/cli/test/unit/ActiveExecutions.test.ts b/packages/cli/test/unit/ActiveExecutions.test.ts index 892d976b4bd3c..f30fd3778ab6d 100644 --- a/packages/cli/test/unit/ActiveExecutions.test.ts +++ b/packages/cli/test/unit/ActiveExecutions.test.ts @@ -45,7 +45,11 @@ describe('ActiveExecutions', () => { test('Should update execution if add is called with execution ID', async () => { const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add(newExecution, FAKE_SECOND_EXECUTION_ID); + const executionId = await activeExecutions.add( + newExecution, + undefined, + FAKE_SECOND_EXECUTION_ID, + ); expect(executionId).toBe(FAKE_SECOND_EXECUTION_ID); expect(activeExecutions.getActiveExecutions().length).toBe(1); @@ -63,7 +67,7 @@ describe('ActiveExecutions', () => { test('Should successfully attach execution to valid executionId', async () => { const newExecution = mockExecutionData(); - await activeExecutions.add(newExecution, FAKE_EXECUTION_ID); + await activeExecutions.add(newExecution, undefined, FAKE_EXECUTION_ID); const deferredPromise = mockCancelablePromise(); expect(() => @@ -73,7 +77,7 @@ describe('ActiveExecutions', () => { test('Should attach and resolve response promise to existing execution', async () => { const newExecution = mockExecutionData(); - await activeExecutions.add(newExecution, FAKE_EXECUTION_ID); + await activeExecutions.add(newExecution, undefined, FAKE_EXECUTION_ID); const deferredPromise = await mockDeferredPromise(); activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, deferredPromise); const fakeResponse = { data: { resultData: { runData: {} } } }; @@ -125,7 +129,6 @@ function mockExecutionData(): IWorkflowExecutionDataProcess { return { executionMode: 'manual', workflowData: { - id: '123', name: 'Test workflow 1', active: false, createdAt: new Date(), diff --git a/packages/core/src/Interfaces.ts b/packages/core/src/Interfaces.ts index 000f2a8125b07..40e5d88eee6a8 100644 --- a/packages/core/src/Interfaces.ts +++ b/packages/core/src/Interfaces.ts @@ -7,6 +7,12 @@ import type { export type Class = new (...args: A) => T; +export interface IProcessMessage { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + data?: any; + type: string; +} + export interface IResponseError extends Error { statusCode?: number; }