Skip to content

Commit

Permalink
feat(core): Remove own execution-process mode (#8490)
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy authored Jan 30, 2024
1 parent 79c9763 commit 121a55b
Show file tree
Hide file tree
Showing 10 changed files with 14 additions and 784 deletions.
30 changes: 3 additions & 27 deletions packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Service } from 'typedi';
import type { ChildProcess } from 'child_process';
import type PCancelable from 'p-cancelable';
import type {
IDeferredPromise,
Expand Down Expand Up @@ -34,11 +33,7 @@ export class ActiveExecutions {
/**
* Add a new active execution
*/
async add(
executionData: IWorkflowExecutionDataProcess,
process?: ChildProcess,
executionId?: string,
): Promise<string> {
async add(executionData: IWorkflowExecutionDataProcess, executionId?: string): Promise<string> {
let executionStatus: ExecutionStatus = executionId ? 'running' : 'new';
if (executionId === undefined) {
// Is a new execution so save in DB
Expand Down Expand Up @@ -82,7 +77,6 @@ export class ActiveExecutions {

this.activeExecutions[executionId] = {
executionData,
process,
startedAt: new Date(),
postExecutePromises: [],
status: executionStatus,
Expand Down Expand Up @@ -152,33 +146,15 @@ export class ActiveExecutions {

/**
* Forces an execution to stop
*
* @param {string} executionId The id of the execution to stop
* @param {string} timeout String 'timeout' given if stop due to timeout
*/
async stopExecution(executionId: string, timeout?: string): Promise<IRun | undefined> {
async stopExecution(executionId: string): Promise<IRun | undefined> {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
// There is no execution running with that id
return;
}

// In case something goes wrong make sure that promise gets first
// returned that it gets then also resolved correctly.
if (execution.process !== undefined) {
// Workflow is running in subprocess
if (execution.process.connected) {
setTimeout(() => {
// execute on next event loop tick;
execution.process!.send({
type: timeout || 'stopExecution',
});
}, 1);
}
} else {
// Workflow is running in current process
execution.workflowExecution!.cancel();
}
execution.workflowExecution!.cancel();

return await this.getPostExecutePromise(executionId);
}
Expand Down
13 changes: 0 additions & 13 deletions packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import type { WorkflowExecute } from 'n8n-core';

import type PCancelable from 'p-cancelable';

import type { ChildProcess } from 'child_process';

import type { DatabaseType } from '@db/types';
import type { AuthProviderType } from '@db/entities/AuthIdentity';
import type { SharedCredentials } from '@db/entities/SharedCredentials';
Expand Down Expand Up @@ -192,7 +190,6 @@ export interface IExecutionsCurrentSummary {

export interface IExecutingWorkflowData {
executionData: IWorkflowExecutionDataProcess;
process?: ChildProcess;
startedAt: Date;
postExecutePromises: Array<IDeferredPromise<IRun | undefined>>;
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>;
Expand Down Expand Up @@ -558,11 +555,6 @@ export interface IWorkflowErrorData {
};
}

export interface IProcessMessageDataHook {
hook: string;
parameters: any[];
}

export interface IWorkflowExecutionDataProcess {
destinationNode?: string;
restartExecutionId?: string;
Expand All @@ -577,11 +569,6 @@ export interface IWorkflowExecutionDataProcess {
userId: string;
}

export interface IWorkflowExecutionDataProcessWithExecution extends IWorkflowExecutionDataProcess {
executionId: string;
userId: string;
}

export interface IWorkflowExecuteProcess {
startedAt: Date;
workflow: Workflow;
Expand Down
1 change: 0 additions & 1 deletion packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ export class Server extends AbstractServer {
},
},
executionVariables: {
executions_process: config.getEnv('executions.process'),
executions_mode: config.getEnv('executions.mode'),
executions_timeout: config.getEnv('executions.timeout'),
executions_timeout_max: config.getEnv('executions.maxTimeout'),
Expand Down
217 changes: 5 additions & 212 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Container, Service } from 'typedi';
import type { IProcessMessage } from 'n8n-core';
import { WorkflowExecute } from 'n8n-core';

import type {
Expand All @@ -22,25 +21,17 @@ import {
} from 'n8n-workflow';

import PCancelable from 'p-cancelable';
import { join as pathJoin } from 'path';
import { fork } from 'child_process';

import { ActiveExecutions } from '@/ActiveExecutions';
import config from '@/config';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { MessageEventBus } from '@/eventbus';
import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service';
import { ExternalHooks } from '@/ExternalHooks';
import type {
IExecutionResponse,
IProcessMessageDataHook,
IWorkflowExecutionDataProcess,
IWorkflowExecutionDataProcessWithExecution,
} from '@/Interfaces';
import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import type { Job, JobData, JobResponse } from '@/Queue';
import { Queue } from '@/Queue';
import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
Expand All @@ -56,8 +47,6 @@ export class WorkflowRunner {

private executionsMode = config.getEnv('executions.mode');

private executionsProcess = config.getEnv('executions.process');

constructor(
private readonly logger: Logger,
private readonly activeExecutions: ActiveExecutions,
Expand All @@ -68,14 +57,6 @@ export class WorkflowRunner {
private readonly permissionChecker: PermissionChecker,
) {}

/** The process did send a hook message so execute the appropriate hook */
private async processHookMessage(
workflowHooks: WorkflowHooks,
hookData: IProcessMessageDataHook,
) {
await workflowHooks.executeHookFunctions(hookData.hook, hookData.parameters);
}

/** The process did error */
async processError(
error: ExecutionError,
Expand Down Expand Up @@ -189,11 +170,7 @@ export class WorkflowRunner {
responsePromise,
);
} else {
if (this.executionsProcess === 'main') {
executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise);
} else {
executionId = await this.runSubprocess(data, loadStaticData, executionId, responsePromise);
}
executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise);
void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data);
}

Expand Down Expand Up @@ -283,7 +260,7 @@ export class WorkflowRunner {
additionalData.restartExecutionId = restartExecutionId;

// Register the active execution
const executionId = await this.activeExecutions.add(data, undefined, restartExecutionId);
const executionId = await this.activeExecutions.add(data, restartExecutionId);
additionalData.executionId = executionId;

this.logger.verbose(
Expand Down Expand Up @@ -380,7 +357,7 @@ export class WorkflowRunner {
if (workflowTimeout > 0) {
const timeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as seconds
executionTimeout = setTimeout(() => {
void this.activeExecutions.stopExecution(executionId, 'timeout');
void this.activeExecutions.stopExecution(executionId);
}, timeout);
}

Expand Down Expand Up @@ -428,7 +405,7 @@ export class WorkflowRunner {
// TODO: If "loadStaticData" is set to true it has to load data new on worker

// Register the active execution
const executionId = await this.activeExecutions.add(data, undefined, restartExecutionId);
const executionId = await this.activeExecutions.add(data, restartExecutionId);
if (responsePromise) {
this.activeExecutions.attachResponsePromise(executionId, responsePromise);
}
Expand Down Expand Up @@ -626,188 +603,4 @@ export class WorkflowRunner {
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
return executionId;
}

/** Run the workflow in a child-process */
private async runSubprocess(
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
restartExecutionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> {
const workflowId = data.workflowData.id;
let startedAt = new Date();
const subprocess = fork(pathJoin(__dirname, 'WorkflowRunnerProcess.js'));

if (loadStaticData === true && workflowId) {
data.workflowData.staticData =
await this.workflowStaticDataService.getStaticDataById(workflowId);
}

data.restartExecutionId = restartExecutionId;

// Register the active execution
const executionId = await this.activeExecutions.add(data, subprocess, restartExecutionId);

(data as unknown as IWorkflowExecutionDataProcessWithExecution).executionId = executionId;
await this.executionRepository.updateStatus(executionId, 'running');

const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);

try {
// Send all data to subprocess it needs to run the workflow
subprocess.send({ type: 'startWorkflow', data } as IProcessMessage);
} catch (error) {
await this.processError(error, new Date(), data.executionMode, executionId, workflowHooks);
return executionId;
}

// Start timeout for the execution
let executionTimeout: NodeJS.Timeout;

const workflowSettings = data.workflowData.settings ?? {};
let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); // initialize with default

const processTimeoutFunction = (timeout: number) => {
void this.activeExecutions.stopExecution(executionId, 'timeout');
executionTimeout = setTimeout(() => subprocess.kill(), Math.max(timeout * 0.2, 5000)); // minimum 5 seconds
};

if (workflowTimeout > 0) {
workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as seconds
// Start timeout already now but give process at least 5 seconds to start.
// Without it could would it be possible that the workflow executions times out before it even got started if
// the timeout time is very short as the process start time can be quite long.
executionTimeout = setTimeout(
processTimeoutFunction,
Math.max(5000, workflowTimeout),
workflowTimeout,
);
}

// Create a list of child spawned executions
// If after the child process exits we have
// outstanding executions, we remove them
const childExecutionIds: string[] = [];

// Listen to data from the subprocess
subprocess.on('message', async (message: IProcessMessage) => {
this.logger.debug(
`Received child process message of type ${message.type} for execution ID ${executionId}.`,
{ executionId },
);
if (message.type === 'start') {
// Now that the execution actually started set the timeout again so that does not time out to early.
startedAt = new Date();
if (workflowTimeout > 0) {
clearTimeout(executionTimeout);
executionTimeout = setTimeout(processTimeoutFunction, workflowTimeout, workflowTimeout);
}
} else if (message.type === 'end') {
clearTimeout(executionTimeout);
this.activeExecutions.remove(executionId, message.data.runData);
} else if (message.type === 'sendResponse') {
if (responsePromise) {
responsePromise.resolve(decodeWebhookResponse(message.data.response));
}
} else if (message.type === 'sendDataToUI') {
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
WorkflowExecuteAdditionalData.sendDataToUI.bind({ sessionId: data.sessionId })(
message.data.type,
message.data.data,
);
} else if (message.type === 'processError') {
clearTimeout(executionTimeout);
const executionError = message.data.executionError as ExecutionError;
await this.processError(
executionError,
startedAt,
data.executionMode,
executionId,
workflowHooks,
);
} else if (message.type === 'processHook') {
await this.processHookMessage(workflowHooks, message.data as IProcessMessageDataHook);
} else if (message.type === 'timeout') {
// Execution timed out and its process has been terminated
const timeoutError = new WorkflowOperationError('Workflow execution timed out!');

// No need to add hook here as the subprocess takes care of calling the hooks
await this.processError(timeoutError, startedAt, data.executionMode, executionId);
} else if (message.type === 'startExecution') {
const executionId = await this.activeExecutions.add(message.data.runData);
childExecutionIds.push(executionId);
subprocess.send({ type: 'executionId', data: { executionId } } as IProcessMessage);
} else if (message.type === 'finishExecution') {
const executionIdIndex = childExecutionIds.indexOf(message.data.executionId);
if (executionIdIndex !== -1) {
childExecutionIds.splice(executionIdIndex, 1);
}

if (message.data.result === undefined) {
const noDataError = new WorkflowOperationError('Workflow finished with no result data');
const subWorkflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(
data,
message.data.executionId,
);
await this.processError(
noDataError,
startedAt,
data.executionMode,
message.data?.executionId,
subWorkflowHooks,
);
} else {
this.activeExecutions.remove(message.data.executionId, message.data.result);
}
}
});

// Also get informed when the processes does exit especially when it did crash or timed out
subprocess.on('exit', async (code, signal) => {
if (signal === 'SIGTERM') {
this.logger.debug(`Subprocess for execution ID ${executionId} timed out.`, { executionId });
// Execution timed out and its process has been terminated
const timeoutError = new WorkflowOperationError('Workflow execution timed out!');

await this.processError(
timeoutError,
startedAt,
data.executionMode,
executionId,
workflowHooks,
);
} else if (code !== 0) {
this.logger.debug(
`Subprocess for execution ID ${executionId} finished with error code ${code}.`,
{ executionId },
);
// Process did exit with error code, so something went wrong.
const executionError = new WorkflowOperationError(
'Workflow execution process crashed for an unknown reason!',
);

await this.processError(
executionError,
startedAt,
data.executionMode,
executionId,
workflowHooks,
);
}

for (const executionId of childExecutionIds) {
// When the child process exits, if we still have
// pending child executions, we mark them as finished
// They will display as unknown to the user
// Instead of pending forever as executing when it
// actually isn't anymore.

this.activeExecutions.remove(executionId);
}

clearTimeout(executionTimeout);
});

return executionId;
}
}
Loading

0 comments on commit 121a55b

Please sign in to comment.