From 7a196697fbaf772bd34dfdb4633c58b1aa5c1966 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 30 Jan 2024 11:23:56 +0100 Subject: [PATCH 1/2] feat(core): Remove `own` mode --- packages/cli/src/ActiveExecutions.ts | 30 +- packages/cli/src/Interfaces.ts | 8 - packages/cli/src/Server.ts | 1 - packages/cli/src/WorkflowRunner.ts | 203 +------ packages/cli/src/WorkflowRunnerProcess.ts | 506 ------------------ packages/cli/src/commands/BaseCommand.ts | 4 +- packages/cli/src/config/index.ts | 1 - packages/cli/src/config/schema.ts | 9 - .../cli/test/unit/ActiveExecutions.test.ts | 11 +- packages/core/src/Interfaces.ts | 6 - 10 files changed, 13 insertions(+), 766 deletions(-) delete mode 100644 packages/cli/src/WorkflowRunnerProcess.ts diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index f9f27f6e50a55..abcc729927d19 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -1,5 +1,4 @@ import { Service } from 'typedi'; -import type { ChildProcess } from 'child_process'; import type PCancelable from 'p-cancelable'; import type { IDeferredPromise, @@ -34,11 +33,7 @@ export class ActiveExecutions { /** * Add a new active execution */ - async add( - executionData: IWorkflowExecutionDataProcess, - process?: ChildProcess, - executionId?: string, - ): Promise { + async add(executionData: IWorkflowExecutionDataProcess, executionId?: string): Promise { let executionStatus: ExecutionStatus = executionId ? 'running' : 'new'; if (executionId === undefined) { // Is a new execution so save in DB @@ -82,7 +77,6 @@ export class ActiveExecutions { this.activeExecutions[executionId] = { executionData, - process, startedAt: new Date(), postExecutePromises: [], status: executionStatus, @@ -152,33 +146,15 @@ export class ActiveExecutions { /** * Forces an execution to stop - * - * @param {string} executionId The id of the execution to stop - * @param {string} timeout String 'timeout' given if stop due to timeout */ - async stopExecution(executionId: string, timeout?: string): Promise { + async stopExecution(executionId: string): Promise { const execution = this.activeExecutions[executionId]; if (execution === undefined) { // There is no execution running with that id return; } - // In case something goes wrong make sure that promise gets first - // returned that it gets then also resolved correctly. - if (execution.process !== undefined) { - // Workflow is running in subprocess - if (execution.process.connected) { - setTimeout(() => { - // execute on next event loop tick; - execution.process!.send({ - type: timeout || 'stopExecution', - }); - }, 1); - } - } else { - // Workflow is running in current process - execution.workflowExecution!.cancel(); - } + execution.workflowExecution!.cancel(); return await this.getPostExecutePromise(executionId); } diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 06b4f9ae95a46..e24a3dae5f473 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -31,8 +31,6 @@ import type { WorkflowExecute } from 'n8n-core'; import type PCancelable from 'p-cancelable'; -import type { ChildProcess } from 'child_process'; - import type { DatabaseType } from '@db/types'; import type { AuthProviderType } from '@db/entities/AuthIdentity'; import type { SharedCredentials } from '@db/entities/SharedCredentials'; @@ -192,7 +190,6 @@ export interface IExecutionsCurrentSummary { export interface IExecutingWorkflowData { executionData: IWorkflowExecutionDataProcess; - process?: ChildProcess; startedAt: Date; postExecutePromises: Array>; responsePromise?: IDeferredPromise; @@ -577,11 +574,6 @@ export interface IWorkflowExecutionDataProcess { userId: string; } -export interface IWorkflowExecutionDataProcessWithExecution extends IWorkflowExecutionDataProcess { - executionId: string; - userId: string; -} - export interface IWorkflowExecuteProcess { startedAt: Date; workflow: Workflow; diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 7f1b06b6b8614..d0532b3782df6 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -156,7 +156,6 @@ export class Server extends AbstractServer { }, }, executionVariables: { - executions_process: config.getEnv('executions.process'), executions_mode: config.getEnv('executions.mode'), executions_timeout: config.getEnv('executions.timeout'), executions_timeout_max: config.getEnv('executions.maxTimeout'), diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index ebd5a5a0cc476..80b279b35079b 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -3,7 +3,6 @@ /* eslint-disable @typescript-eslint/no-shadow */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ import { Container, Service } from 'typedi'; -import type { IProcessMessage } from 'n8n-core'; import { WorkflowExecute } from 'n8n-core'; import type { @@ -22,8 +21,6 @@ import { } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; -import { join as pathJoin } from 'path'; -import { fork } from 'child_process'; import { ActiveExecutions } from '@/ActiveExecutions'; import config from '@/config'; @@ -35,12 +32,10 @@ import type { IExecutionResponse, IProcessMessageDataHook, IWorkflowExecutionDataProcess, - IWorkflowExecutionDataProcessWithExecution, } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; import type { Job, JobData, JobResponse } from '@/Queue'; import { Queue } from '@/Queue'; -import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse'; import * as WorkflowHelpers from '@/WorkflowHelpers'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; @@ -56,8 +51,6 @@ export class WorkflowRunner { private executionsMode = config.getEnv('executions.mode'); - private executionsProcess = config.getEnv('executions.process'); - constructor( private readonly logger: Logger, private readonly activeExecutions: ActiveExecutions, @@ -189,11 +182,7 @@ export class WorkflowRunner { responsePromise, ); } else { - if (this.executionsProcess === 'main') { - executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise); - } else { - executionId = await this.runSubprocess(data, loadStaticData, executionId, responsePromise); - } + executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise); void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data); } @@ -283,7 +272,7 @@ export class WorkflowRunner { additionalData.restartExecutionId = restartExecutionId; // Register the active execution - const executionId = await this.activeExecutions.add(data, undefined, restartExecutionId); + const executionId = await this.activeExecutions.add(data, restartExecutionId); additionalData.executionId = executionId; this.logger.verbose( @@ -380,7 +369,7 @@ export class WorkflowRunner { if (workflowTimeout > 0) { const timeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as seconds executionTimeout = setTimeout(() => { - void this.activeExecutions.stopExecution(executionId, 'timeout'); + void this.activeExecutions.stopExecution(executionId); }, timeout); } @@ -428,7 +417,7 @@ export class WorkflowRunner { // TODO: If "loadStaticData" is set to true it has to load data new on worker // Register the active execution - const executionId = await this.activeExecutions.add(data, undefined, restartExecutionId); + const executionId = await this.activeExecutions.add(data, restartExecutionId); if (responsePromise) { this.activeExecutions.attachResponsePromise(executionId, responsePromise); } @@ -626,188 +615,4 @@ export class WorkflowRunner { this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); return executionId; } - - /** Run the workflow in a child-process */ - private async runSubprocess( - data: IWorkflowExecutionDataProcess, - loadStaticData?: boolean, - restartExecutionId?: string, - responsePromise?: IDeferredPromise, - ): Promise { - const workflowId = data.workflowData.id; - let startedAt = new Date(); - const subprocess = fork(pathJoin(__dirname, 'WorkflowRunnerProcess.js')); - - if (loadStaticData === true && workflowId) { - data.workflowData.staticData = - await this.workflowStaticDataService.getStaticDataById(workflowId); - } - - data.restartExecutionId = restartExecutionId; - - // Register the active execution - const executionId = await this.activeExecutions.add(data, subprocess, restartExecutionId); - - (data as unknown as IWorkflowExecutionDataProcessWithExecution).executionId = executionId; - await this.executionRepository.updateStatus(executionId, 'running'); - - const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId); - - try { - // Send all data to subprocess it needs to run the workflow - subprocess.send({ type: 'startWorkflow', data } as IProcessMessage); - } catch (error) { - await this.processError(error, new Date(), data.executionMode, executionId, workflowHooks); - return executionId; - } - - // Start timeout for the execution - let executionTimeout: NodeJS.Timeout; - - const workflowSettings = data.workflowData.settings ?? {}; - let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); // initialize with default - - const processTimeoutFunction = (timeout: number) => { - void this.activeExecutions.stopExecution(executionId, 'timeout'); - executionTimeout = setTimeout(() => subprocess.kill(), Math.max(timeout * 0.2, 5000)); // minimum 5 seconds - }; - - if (workflowTimeout > 0) { - workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as seconds - // Start timeout already now but give process at least 5 seconds to start. - // Without it could would it be possible that the workflow executions times out before it even got started if - // the timeout time is very short as the process start time can be quite long. - executionTimeout = setTimeout( - processTimeoutFunction, - Math.max(5000, workflowTimeout), - workflowTimeout, - ); - } - - // Create a list of child spawned executions - // If after the child process exits we have - // outstanding executions, we remove them - const childExecutionIds: string[] = []; - - // Listen to data from the subprocess - subprocess.on('message', async (message: IProcessMessage) => { - this.logger.debug( - `Received child process message of type ${message.type} for execution ID ${executionId}.`, - { executionId }, - ); - if (message.type === 'start') { - // Now that the execution actually started set the timeout again so that does not time out to early. - startedAt = new Date(); - if (workflowTimeout > 0) { - clearTimeout(executionTimeout); - executionTimeout = setTimeout(processTimeoutFunction, workflowTimeout, workflowTimeout); - } - } else if (message.type === 'end') { - clearTimeout(executionTimeout); - this.activeExecutions.remove(executionId, message.data.runData); - } else if (message.type === 'sendResponse') { - if (responsePromise) { - responsePromise.resolve(decodeWebhookResponse(message.data.response)); - } - } else if (message.type === 'sendDataToUI') { - // eslint-disable-next-line @typescript-eslint/no-unsafe-call - WorkflowExecuteAdditionalData.sendDataToUI.bind({ sessionId: data.sessionId })( - message.data.type, - message.data.data, - ); - } else if (message.type === 'processError') { - clearTimeout(executionTimeout); - const executionError = message.data.executionError as ExecutionError; - await this.processError( - executionError, - startedAt, - data.executionMode, - executionId, - workflowHooks, - ); - } else if (message.type === 'processHook') { - await this.processHookMessage(workflowHooks, message.data as IProcessMessageDataHook); - } else if (message.type === 'timeout') { - // Execution timed out and its process has been terminated - const timeoutError = new WorkflowOperationError('Workflow execution timed out!'); - - // No need to add hook here as the subprocess takes care of calling the hooks - await this.processError(timeoutError, startedAt, data.executionMode, executionId); - } else if (message.type === 'startExecution') { - const executionId = await this.activeExecutions.add(message.data.runData); - childExecutionIds.push(executionId); - subprocess.send({ type: 'executionId', data: { executionId } } as IProcessMessage); - } else if (message.type === 'finishExecution') { - const executionIdIndex = childExecutionIds.indexOf(message.data.executionId); - if (executionIdIndex !== -1) { - childExecutionIds.splice(executionIdIndex, 1); - } - - if (message.data.result === undefined) { - const noDataError = new WorkflowOperationError('Workflow finished with no result data'); - const subWorkflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain( - data, - message.data.executionId, - ); - await this.processError( - noDataError, - startedAt, - data.executionMode, - message.data?.executionId, - subWorkflowHooks, - ); - } else { - this.activeExecutions.remove(message.data.executionId, message.data.result); - } - } - }); - - // Also get informed when the processes does exit especially when it did crash or timed out - subprocess.on('exit', async (code, signal) => { - if (signal === 'SIGTERM') { - this.logger.debug(`Subprocess for execution ID ${executionId} timed out.`, { executionId }); - // Execution timed out and its process has been terminated - const timeoutError = new WorkflowOperationError('Workflow execution timed out!'); - - await this.processError( - timeoutError, - startedAt, - data.executionMode, - executionId, - workflowHooks, - ); - } else if (code !== 0) { - this.logger.debug( - `Subprocess for execution ID ${executionId} finished with error code ${code}.`, - { executionId }, - ); - // Process did exit with error code, so something went wrong. - const executionError = new WorkflowOperationError( - 'Workflow execution process crashed for an unknown reason!', - ); - - await this.processError( - executionError, - startedAt, - data.executionMode, - executionId, - workflowHooks, - ); - } - - for (const executionId of childExecutionIds) { - // When the child process exits, if we still have - // pending child executions, we mark them as finished - // They will display as unknown to the user - // Instead of pending forever as executing when it - // actually isn't anymore. - - this.activeExecutions.remove(executionId); - } - - clearTimeout(executionTimeout); - }); - - return executionId; - } } diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts deleted file mode 100644 index a4c1e9df83397..0000000000000 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ /dev/null @@ -1,506 +0,0 @@ -/* eslint-disable @typescript-eslint/no-unsafe-argument */ -/* eslint-disable @typescript-eslint/no-unsafe-member-access */ -/* eslint-disable @typescript-eslint/no-unsafe-assignment */ -/* eslint-disable @typescript-eslint/no-shadow */ -/* eslint-disable @typescript-eslint/no-use-before-define */ -/* eslint-disable @typescript-eslint/unbound-method */ -import 'source-map-support/register'; -import 'reflect-metadata'; -import { setDefaultResultOrder } from 'dns'; - -import { Container } from 'typedi'; -import type { IProcessMessage } from 'n8n-core'; -import { BinaryDataService, WorkflowExecute } from 'n8n-core'; - -import type { - ExecutionError, - IDataObject, - IExecuteResponsePromiseData, - IExecuteWorkflowInfo, - INode, - INodeExecutionData, - IRun, - ITaskData, - IWorkflowExecuteAdditionalData, - IWorkflowExecuteHooks, - IWorkflowSettings, - NodeOperationError, - WorkflowExecuteMode, -} from 'n8n-workflow'; -import { - ErrorReporterProxy as ErrorReporter, - Workflow, - WorkflowHooks, - WorkflowOperationError, -} from 'n8n-workflow'; -import * as Db from '@/Db'; -import { ExternalHooks } from '@/ExternalHooks'; -import type { - IWorkflowExecuteProcess, - IWorkflowExecutionDataProcessWithExecution, -} from '@/Interfaces'; -import { NodeTypes } from '@/NodeTypes'; -import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; -import * as WebhookHelpers from '@/WebhookHelpers'; -import * as WorkflowHelpers from '@/WorkflowHelpers'; -import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; -import { Logger } from '@/Logger'; - -import config from '@/config'; -import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; -import { initErrorHandling } from '@/ErrorReporting'; -import { PermissionChecker } from '@/UserManagement/PermissionChecker'; -import { License } from '@/License'; -import { InternalHooks } from '@/InternalHooks'; -import { PostHogClient } from '@/posthog'; - -if (process.env.NODEJS_PREFER_IPV4 === 'true') { - setDefaultResultOrder('ipv4first'); -} - -class WorkflowRunnerProcess { - data: IWorkflowExecutionDataProcessWithExecution | undefined; - - logger: Logger; - - startedAt = new Date(); - - workflow: Workflow | undefined; - - workflowExecute: WorkflowExecute | undefined; - - // eslint-disable-next-line @typescript-eslint/no-invalid-void-type - executionIdCallback: (executionId: string) => void | undefined; - - childExecutions: { - [key: string]: IWorkflowExecuteProcess; - } = {}; - - static async stopProcess() { - setTimeout(() => { - // Attempt a graceful shutdown, giving executions 30 seconds to finish - process.exit(0); - }, 30000); - } - - constructor() { - this.logger = Container.get(Logger); - } - - async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise { - process.once('SIGTERM', WorkflowRunnerProcess.stopProcess); - process.once('SIGINT', WorkflowRunnerProcess.stopProcess); - - await initErrorHandling(); - - this.data = inputData; - const { userId } = inputData; - - this.logger.verbose('Initializing n8n sub-process', { - pid: process.pid, - workflowId: this.data.workflowData.id, - }); - - this.startedAt = new Date(); - - // Init db since we need to read the license. - await Db.init(); - - const nodeTypes = Container.get(NodeTypes); - await Container.get(LoadNodesAndCredentials).init(); - - // Load all external hooks - const externalHooks = Container.get(ExternalHooks); - await externalHooks.init(); - - await Container.get(PostHogClient).init(); - await Container.get(InternalHooks).init(); - - const binaryDataConfig = config.getEnv('binaryDataManager'); - await Container.get(BinaryDataService).init(binaryDataConfig); - - const license = Container.get(License); - await license.init(); - - const workflowSettings = this.data.workflowData.settings ?? {}; - - // Start timeout for the execution - let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); // initialize with default - if (workflowTimeout > 0) { - workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')); - } - - this.workflow = new Workflow({ - id: this.data.workflowData.id, - name: this.data.workflowData.name, - nodes: this.data.workflowData.nodes, - connections: this.data.workflowData.connections, - active: this.data.workflowData.active, - nodeTypes, - staticData: this.data.workflowData.staticData, - settings: this.data.workflowData.settings, - pinData: this.data.pinData, - }); - try { - await Container.get(PermissionChecker).check(this.workflow, userId); - } catch (error) { - const caughtError = error as NodeOperationError; - const failedExecutionData = generateFailedExecutionFromError( - this.data.executionMode, - caughtError, - caughtError.node, - ); - - // Force the `workflowExecuteAfter` hook to run since - // it's the one responsible for saving the execution - await this.sendHookToParentProcess('workflowExecuteAfter', [failedExecutionData]); - // Interrupt the workflow execution since we don't have all necessary creds. - return failedExecutionData; - } - const additionalData = await WorkflowExecuteAdditionalData.getBase( - userId, - undefined, - workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000, - ); - additionalData.restartExecutionId = this.data.restartExecutionId; - additionalData.hooks = this.getProcessForwardHooks(); - - additionalData.hooks.hookFunctions.sendResponse = [ - async (response: IExecuteResponsePromiseData): Promise => { - await sendToParentProcess('sendResponse', { - response: WebhookHelpers.encodeWebhookResponse(response), - }); - }, - ]; - - additionalData.executionId = inputData.executionId; - - additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({ - executionId: inputData.executionId, - }); - - additionalData.sendDataToUI = async (type: string, data: IDataObject | IDataObject[]) => { - if (workflowRunner.data!.executionMode !== 'manual') { - return; - } - - try { - await sendToParentProcess('sendDataToUI', { type, data }); - } catch (error) { - ErrorReporter.error(error); - this.logger.error( - `There was a problem sending UI data to parent process: "${error.message}"`, - ); - } - }; - const executeWorkflowFunction = additionalData.executeWorkflow; - additionalData.executeWorkflow = async ( - workflowInfo: IExecuteWorkflowInfo, - additionalData: IWorkflowExecuteAdditionalData, - options: { - parentWorkflowId: string; - inputData?: INodeExecutionData[]; - parentWorkflowSettings?: IWorkflowSettings; - }, - ): Promise | IRun> => { - const workflowData = await WorkflowExecuteAdditionalData.getWorkflowData( - workflowInfo, - options.parentWorkflowId, - options.parentWorkflowSettings, - ); - const runData = await WorkflowExecuteAdditionalData.getRunData( - workflowData, - additionalData.userId, - options?.inputData, - ); - await sendToParentProcess('startExecution', { runData }); - const executionId: string = await new Promise((resolve) => { - this.executionIdCallback = (executionId: string) => { - resolve(executionId); - }; - }); - - void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId || '', runData); - - let result: IRun; - try { - const executeWorkflowFunctionOutput = (await executeWorkflowFunction( - workflowInfo, - additionalData, - { - parentWorkflowId: options?.parentWorkflowId, - inputData: options?.inputData, - parentExecutionId: executionId, - loadedWorkflowData: workflowData, - loadedRunData: runData, - parentWorkflowSettings: options?.parentWorkflowSettings, - }, - )) as { workflowExecute: WorkflowExecute; workflow: Workflow } as IWorkflowExecuteProcess; - const { workflowExecute } = executeWorkflowFunctionOutput; - this.childExecutions[executionId] = executeWorkflowFunctionOutput; - const { workflow } = executeWorkflowFunctionOutput; - result = await workflowExecute.processRunExecutionData(workflow); - await externalHooks.run('workflow.postExecute', [result, workflowData, executionId]); - void Container.get(InternalHooks).onWorkflowPostExecute( - executionId, - workflowData, - result, - additionalData.userId, - ); - await sendToParentProcess('finishExecution', { executionId, result }); - delete this.childExecutions[executionId]; - } catch (e) { - await sendToParentProcess('finishExecution', { executionId }); - delete this.childExecutions[executionId]; - // Throw same error we had - throw e; - } - - await sendToParentProcess('finishExecution', { executionId, result }); - - const returnData = WorkflowHelpers.getDataLastExecutedNodeData(result); - - if (returnData!.error) { - const error = new Error(returnData!.error.message); - error.stack = returnData!.error.stack; - throw error; - } - - return returnData!.data!.main; - }; - - if (this.data.executionData !== undefined) { - this.workflowExecute = new WorkflowExecute( - additionalData, - this.data.executionMode, - this.data.executionData, - ); - return await this.workflowExecute.processRunExecutionData(this.workflow); - } - if ( - this.data.runData === undefined || - this.data.startNodes === undefined || - this.data.startNodes.length === 0 - ) { - // Execute all nodes - - const startNode = WorkflowHelpers.getExecutionStartNode(this.data, this.workflow); - - // Can execute without webhook so go on - this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode); - return await this.workflowExecute.run( - this.workflow, - startNode, - this.data.destinationNode, - this.data.pinData, - ); - } - // Execute only the nodes between start and destination nodes - this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode); - return await this.workflowExecute.runPartialWorkflow( - this.workflow, - this.data.runData, - this.data.startNodes, - this.data.destinationNode, - this.data.pinData, - ); - } - - /** - * Sends hook data to the parent process that it executes them - */ - async sendHookToParentProcess(hook: string, parameters: any[]) { - try { - await sendToParentProcess('processHook', { - hook, - parameters, - }); - } catch (error) { - ErrorReporter.error(error); - this.logger.error(`There was a problem sending hook: "${hook}"`, { parameters, error }); - } - } - - /** - * Create a wrapper for hooks which simply forwards the data to - * the parent process where they then can be executed with access - * to database and to PushService - * - */ - getProcessForwardHooks(): WorkflowHooks { - const hookFunctions: IWorkflowExecuteHooks = { - nodeExecuteBefore: [ - async (nodeName: string): Promise => { - await this.sendHookToParentProcess('nodeExecuteBefore', [nodeName]); - }, - ], - nodeExecuteAfter: [ - async (nodeName: string, data: ITaskData): Promise => { - await this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data]); - }, - ], - workflowExecuteBefore: [ - async (): Promise => { - await this.sendHookToParentProcess('workflowExecuteBefore', []); - }, - ], - workflowExecuteAfter: [ - async (fullRunData: IRun, newStaticData?: IDataObject): Promise => { - await this.sendHookToParentProcess('workflowExecuteAfter', [fullRunData, newStaticData]); - }, - ], - nodeFetchedData: [ - async (workflowId: string, node: INode) => { - await this.sendHookToParentProcess('nodeFetchedData', [workflowId, node]); - }, - ], - }; - - const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute(); - - for (const key of Object.keys(preExecuteFunctions)) { - if (hookFunctions[key] === undefined) { - hookFunctions[key] = []; - } - hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); - } - - return new WorkflowHooks( - hookFunctions, - this.data!.executionMode, - this.data!.executionId, - this.data!.workflowData, - { sessionId: this.data!.sessionId, retryOf: this.data!.retryOf as string }, - ); - } -} - -/** - * Sends data to parent process - * - * @param {string} type The type of data to send - * @param {*} data The data - */ -async function sendToParentProcess(type: string, data: any): Promise { - return await new Promise((resolve, reject) => { - process.send!( - { - type, - data, - }, - (error: Error) => { - if (error) { - return reject(error); - } - - resolve(); - }, - ); - }); -} - -const workflowRunner = new WorkflowRunnerProcess(); - -// Listen to messages from parent process which send the data of -// the workflow to process -process.on('message', async (message: IProcessMessage) => { - try { - if (message.type === 'startWorkflow') { - await sendToParentProcess('start', {}); - - const runData = await workflowRunner.runWorkflow(message.data); - - await sendToParentProcess('end', { - runData, - }); - - // Once the workflow got executed make sure the process gets killed again - process.exit(); - } else if (message.type === 'stopExecution' || message.type === 'timeout') { - // The workflow execution should be stopped - let runData: IRun; - - if (workflowRunner.workflowExecute !== undefined) { - const executionIds = Object.keys(workflowRunner.childExecutions); - - for (const executionId of executionIds) { - const childWorkflowExecute = workflowRunner.childExecutions[executionId]; - runData = childWorkflowExecute.workflowExecute.getFullRunData( - workflowRunner.childExecutions[executionId].startedAt, - ); - const timeOutError = - message.type === 'timeout' - ? new WorkflowOperationError('Workflow execution timed out!') - : new WorkflowOperationError('Workflow-Execution has been canceled!'); - - // If there is any data send it to parent process, if execution timedout add the error - - await childWorkflowExecute.workflowExecute.processSuccessExecution( - workflowRunner.childExecutions[executionId].startedAt, - childWorkflowExecute.workflow, - timeOutError, - ); - } - - // Workflow started already executing - runData = workflowRunner.workflowExecute.getFullRunData(workflowRunner.startedAt); - - const timeOutError = - message.type === 'timeout' - ? new WorkflowOperationError('Workflow execution timed out!') - : new WorkflowOperationError('Workflow-Execution has been canceled!'); - - runData.status = message.type === 'timeout' ? 'failed' : 'canceled'; - - // If there is any data send it to parent process, if execution timedout add the error - await workflowRunner.workflowExecute.processSuccessExecution( - workflowRunner.startedAt, - workflowRunner.workflow!, - timeOutError, - ); - } else { - // Workflow did not get started yet - runData = { - data: { - resultData: { - runData: {}, - }, - }, - finished: false, - mode: workflowRunner.data - ? workflowRunner.data.executionMode - : ('own' as WorkflowExecuteMode), - startedAt: workflowRunner.startedAt, - stoppedAt: new Date(), - status: 'canceled', - }; - - await workflowRunner.sendHookToParentProcess('workflowExecuteAfter', [runData]); - } - - await sendToParentProcess(message.type === 'timeout' ? message.type : 'end', { - runData, - }); - - // Stop process - process.exit(); - } else if (message.type === 'executionId') { - workflowRunner.executionIdCallback(message.data.executionId); - } - } catch (error) { - workflowRunner.logger.error(error.message); - - // Catch all uncaught errors and forward them to parent process - const executionError = { - ...error, - name: error.name || 'Error', - message: error.message, - stack: error.stack, - } as ExecutionError; - - await sendToParentProcess('processError', { - executionError, - }); - process.exit(); - } -}); diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 61aab82702253..a6a81805d4ebe 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -84,8 +84,8 @@ export abstract class BaseCommand extends Command { ); } if (process.env.EXECUTIONS_PROCESS === 'own') { - this.logger.warn( - 'Own mode has been deprecated and will be removed in a future version of n8n. If you need the isolation and performance gains, please consider using queue mode.', + throw new ApplicationError( + 'Own mode has been removed. If you need the isolation and performance gains, please consider using queue mode.', ); } diff --git a/packages/cli/src/config/index.ts b/packages/cli/src/config/index.ts index 7300b0dcf739f..5cc2d2a0c1829 100644 --- a/packages/cli/src/config/index.ts +++ b/packages/cli/src/config/index.ts @@ -6,7 +6,6 @@ import { inTest, inE2ETests } from '@/constants'; if (inE2ETests) { // Skip loading config from env variables in end-to-end tests - process.env.EXECUTIONS_PROCESS = 'main'; process.env.N8N_DIAGNOSTICS_ENABLED = 'false'; process.env.N8N_PUBLIC_API_DISABLED = 'true'; process.env.EXTERNAL_FRONTEND_HOOKS_URLS = ''; diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 56728aecb98cb..4402f82838d45 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -234,15 +234,6 @@ export const schema = { }, executions: { - // By default workflows get always executed in the main process. - // TODO: remove this and all usage of `executions.process` when `own` mode is deleted - process: { - doc: 'In what process workflows should be executed.', - format: ['main', 'own'] as const, - default: 'main', - env: 'EXECUTIONS_PROCESS', - }, - mode: { doc: 'If it should run executions directly or via queue', format: ['regular', 'queue'] as const, diff --git a/packages/cli/test/unit/ActiveExecutions.test.ts b/packages/cli/test/unit/ActiveExecutions.test.ts index f30fd3778ab6d..892d976b4bd3c 100644 --- a/packages/cli/test/unit/ActiveExecutions.test.ts +++ b/packages/cli/test/unit/ActiveExecutions.test.ts @@ -45,11 +45,7 @@ describe('ActiveExecutions', () => { test('Should update execution if add is called with execution ID', async () => { const newExecution = mockExecutionData(); - const executionId = await activeExecutions.add( - newExecution, - undefined, - FAKE_SECOND_EXECUTION_ID, - ); + const executionId = await activeExecutions.add(newExecution, FAKE_SECOND_EXECUTION_ID); expect(executionId).toBe(FAKE_SECOND_EXECUTION_ID); expect(activeExecutions.getActiveExecutions().length).toBe(1); @@ -67,7 +63,7 @@ describe('ActiveExecutions', () => { test('Should successfully attach execution to valid executionId', async () => { const newExecution = mockExecutionData(); - await activeExecutions.add(newExecution, undefined, FAKE_EXECUTION_ID); + await activeExecutions.add(newExecution, FAKE_EXECUTION_ID); const deferredPromise = mockCancelablePromise(); expect(() => @@ -77,7 +73,7 @@ describe('ActiveExecutions', () => { test('Should attach and resolve response promise to existing execution', async () => { const newExecution = mockExecutionData(); - await activeExecutions.add(newExecution, undefined, FAKE_EXECUTION_ID); + await activeExecutions.add(newExecution, FAKE_EXECUTION_ID); const deferredPromise = await mockDeferredPromise(); activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, deferredPromise); const fakeResponse = { data: { resultData: { runData: {} } } }; @@ -129,6 +125,7 @@ function mockExecutionData(): IWorkflowExecutionDataProcess { return { executionMode: 'manual', workflowData: { + id: '123', name: 'Test workflow 1', active: false, createdAt: new Date(), diff --git a/packages/core/src/Interfaces.ts b/packages/core/src/Interfaces.ts index 40e5d88eee6a8..000f2a8125b07 100644 --- a/packages/core/src/Interfaces.ts +++ b/packages/core/src/Interfaces.ts @@ -7,12 +7,6 @@ import type { export type Class = new (...args: A) => T; -export interface IProcessMessage { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - data?: any; - type: string; -} - export interface IResponseError extends Error { statusCode?: number; } From 681ef0a53230e26b9d567012938b7ca207dd29e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 30 Jan 2024 12:25:05 +0100 Subject: [PATCH 2/2] delete more unused code --- packages/cli/src/Interfaces.ts | 5 ----- packages/cli/src/WorkflowRunner.ts | 14 +------------- 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index e24a3dae5f473..c067ba9baac45 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -555,11 +555,6 @@ export interface IWorkflowErrorData { }; } -export interface IProcessMessageDataHook { - hook: string; - parameters: any[]; -} - export interface IWorkflowExecutionDataProcess { destinationNode?: string; restartExecutionId?: string; diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 80b279b35079b..b3aecba57e35a 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -28,11 +28,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { MessageEventBus } from '@/eventbus'; import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service'; import { ExternalHooks } from '@/ExternalHooks'; -import type { - IExecutionResponse, - IProcessMessageDataHook, - IWorkflowExecutionDataProcess, -} from '@/Interfaces'; +import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; import type { Job, JobData, JobResponse } from '@/Queue'; import { Queue } from '@/Queue'; @@ -61,14 +57,6 @@ export class WorkflowRunner { private readonly permissionChecker: PermissionChecker, ) {} - /** The process did send a hook message so execute the appropriate hook */ - private async processHookMessage( - workflowHooks: WorkflowHooks, - hookData: IProcessMessageDataHook, - ) { - await workflowHooks.executeHookFunctions(hookData.hook, hookData.parameters); - } - /** The process did error */ async processError( error: ExecutionError,