Skip to content

Commit

Permalink
fix(core): Update subworkflow execution status correctly (#10764)
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy authored Sep 11, 2024
1 parent b9d157d commit 4f94319
Show file tree
Hide file tree
Showing 16 changed files with 51 additions and 78 deletions.
7 changes: 5 additions & 2 deletions packages/cli/src/__tests__/active-executions.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { ActiveExecutions } from '@/active-executions';
import PCancelable from 'p-cancelable';
import { v4 as uuid } from 'uuid';
import type { IExecuteResponsePromiseData, IRun } from 'n8n-workflow';
import type {
IExecuteResponsePromiseData,
IRun,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import { createDeferredPromise } from 'n8n-workflow';
import type { IWorkflowExecutionDataProcess } from '@/interfaces';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { mock } from 'jest-mock-extended';
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
Expand Down
3 changes: 1 addition & 2 deletions packages/cli/src/__tests__/workflow-helpers.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { type Workflow } from 'n8n-workflow';
import type { Workflow, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { getExecutionStartNode } from '@/workflow-helpers';
import type { IWorkflowExecutionDataProcess } from '@/interfaces';

describe('WorkflowHelpers', () => {
describe('getExecutionStartNode', () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/active-executions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
IExecuteResponsePromiseData,
IRun,
ExecutionStatus,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
ApplicationError,
Expand All @@ -19,7 +20,6 @@ import type {
IExecutingWorkflowData,
IExecutionDb,
IExecutionsCurrentSummary,
IWorkflowExecutionDataProcess,
} from '@/interfaces';
import { isWorkflowIdValid } from '@/utils';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/commands/execute-batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ import { Container } from 'typedi';
import { Flags } from '@oclif/core';
import fs from 'fs';
import os from 'os';
import type { IRun, ITaskData } from 'n8n-workflow';
import type { IRun, ITaskData, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { ApplicationError, jsonParse } from 'n8n-workflow';
import { sep } from 'path';
import { diff } from 'json-diff';
import pick from 'lodash/pick';

import { ActiveExecutions } from '@/active-executions';
import { WorkflowRunner } from '@/workflow-runner';
import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/interfaces';
import type { IWorkflowDb } from '@/interfaces';
import type { User } from '@/databases/entities/user';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { OwnershipService } from '@/services/ownership.service';
Expand Down
3 changes: 1 addition & 2 deletions packages/cli/src/commands/execute.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { Container } from 'typedi';
import { Flags } from '@oclif/core';
import type { IWorkflowBase } from 'n8n-workflow';
import type { IWorkflowBase, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { ApplicationError, ExecutionBaseError } from 'n8n-workflow';

import { ActiveExecutions } from '@/active-executions';
import { WorkflowRunner } from '@/workflow-runner';
import type { IWorkflowExecutionDataProcess } from '@/interfaces';
import { findCliWorkflowStart, isWorkflowIdValid } from '@/utils';
import { BaseCommand } from './base-command';

Expand Down
3 changes: 1 addition & 2 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { createReadStream, createWriteStream, existsSync } from 'fs';
import { pipeline } from 'stream/promises';
import replaceStream from 'replacestream';
import glob from 'fast-glob';
import { jsonParse, randomString } from 'n8n-workflow';
import { jsonParse, randomString, type IWorkflowExecutionDataProcess } from 'n8n-workflow';

import config from '@/config';
import { ActiveExecutions } from '@/active-executions';
Expand All @@ -26,7 +26,6 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito
import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error';
import { WaitTracker } from '@/wait-tracker';
import { BaseCommand } from './base-command';
import type { IWorkflowExecutionDataProcess } from '@/interfaces';
import { ExecutionService } from '@/executions/execution.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/workflow-runner';
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/events/relay-event-map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import type {
IPersonalizationSurveyAnswersV4,
IRun,
IWorkflowBase,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/interfaces';
import type { IWorkflowDb } from '@/interfaces';
import type { ProjectRole } from '@/databases/entities/project-relation';
import type { GlobalRole } from '@/databases/entities/user';
import type { AuthProviderType } from '@/databases/entities/auth-identity';
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/executions/execution.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
IRunExecutionData,
IWorkflowBase,
WorkflowExecuteMode,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
ApplicationError,
Expand All @@ -21,7 +22,6 @@ import type {
IExecutionFlattedResponse,
IExecutionResponse,
IWorkflowDb,
IWorkflowExecutionDataProcess,
} from '@/interfaces';
import { NodeTypes } from '@/node-types';
import type { ExecutionRequest, ExecutionSummaries, StopResult } from './execution.types';
Expand Down
19 changes: 1 addition & 18 deletions packages/cli/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import type {
IDataObject,
IDeferredPromise,
IExecuteResponsePromiseData,
IPinData,
IRun,
IRunData,
IRunExecutionData,
ITaskData,
ITelemetryTrackProperties,
Expand All @@ -22,7 +20,7 @@ import type {
FeatureFlags,
INodeProperties,
IUserSettings,
StartNodeData,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';

import type { ActiveWorkflowManager } from '@/active-workflow-manager';
Expand Down Expand Up @@ -495,21 +493,6 @@ export interface IWorkflowErrorData {
};
}

export interface IWorkflowExecutionDataProcess {
destinationNode?: string;
restartExecutionId?: string;
executionMode: WorkflowExecuteMode;
executionData?: IRunExecutionData;
runData?: IRunData;
pinData?: IPinData;
retryOf?: string;
pushRef?: string;
startNodes?: StartNodeData[];
workflowData: IWorkflowBase;
userId?: string;
projectId?: string;
}

export interface IWorkflowExecuteProcess {
startedAt: Date;
workflow: Workflow;
Expand Down
7 changes: 5 additions & 2 deletions packages/cli/src/wait-tracker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { ApplicationError, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow';
import {
ApplicationError,
ErrorReporterProxy as ErrorReporter,
type IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import { Service } from 'typedi';
import type { IWorkflowExecutionDataProcess } from '@/interfaces';
import { WorkflowRunner } from '@/workflow-runner';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { OwnershipService } from '@/services/ownership.service';
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/webhooks/webhook-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import type {
WebhookResponseMode,
Workflow,
WorkflowExecuteMode,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
ApplicationError,
Expand All @@ -53,7 +54,7 @@ import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { UnprocessableRequestError } from '@/errors/response-errors/unprocessable.error';
import type { Project } from '@/databases/entities/project';
import type { IExecutionDb, IWorkflowDb, IWorkflowExecutionDataProcess } from '@/interfaces';
import type { IExecutionDb, IWorkflowDb } from '@/interfaces';

/**
* Returns all the webhooks which should be created for the given workflow
Expand Down
42 changes: 8 additions & 34 deletions packages/cli/src/workflow-execute-additional-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import type {
ExecutionStatus,
ExecutionError,
EventNamesAiNodesType,
CallbackManager,
ExecuteWorkflowOptions,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
ApplicationError,
Expand All @@ -42,7 +43,6 @@ import { ExternalHooks } from '@/external-hooks';
import type {
IPushDataExecutionFinished,
IWorkflowExecuteProcess,
IWorkflowExecutionDataProcess,
IWorkflowErrorData,
IPushDataType,
ExecutionPayload,
Expand Down Expand Up @@ -714,13 +714,11 @@ export async function getRunData(
},
};

const runData: IWorkflowExecutionDataProcess = {
return {
executionMode: mode,
executionData: runExecutionData,
workflowData,
};

return runData;
}

export async function getWorkflowData(
Expand Down Expand Up @@ -769,23 +767,15 @@ export async function getWorkflowData(
async function executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
additionalData: IWorkflowExecuteAdditionalData,
options: {
node?: INode;
parentWorkflowId: string;
inputData?: INodeExecutionData[];
parentExecutionId?: string;
loadedWorkflowData?: IWorkflowBase;
loadedRunData?: IWorkflowExecutionDataProcess;
parentWorkflowSettings?: IWorkflowSettings;
parentCallbackManager?: CallbackManager;
},
options: ExecuteWorkflowOptions,
): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> {
const externalHooks = Container.get(ExternalHooks);
await externalHooks.init();

const nodeTypes = Container.get(NodeTypes);
const activeExecutions = Container.get(ActiveExecutions);
const eventService = Container.get(EventService);
const executionRepository = Container.get(ExecutionRepository);

const workflowData =
options.loadedWorkflowData ??
Expand All @@ -805,13 +795,8 @@ async function executeWorkflow(

const runData = options.loadedRunData ?? (await getRunData(workflowData, options.inputData));

let executionId;

if (options.parentExecutionId !== undefined) {
executionId = options.parentExecutionId;
} else {
executionId = options.parentExecutionId ?? (await activeExecutions.add(runData));
}
const executionId = await activeExecutions.add(runData);
await executionRepository.updateStatus(executionId, 'running');

Container.get(EventService).emit('workflow-pre-execute', { executionId, data: runData });

Expand Down Expand Up @@ -862,14 +847,6 @@ async function executeWorkflow(
runData.executionMode,
runExecutionData,
);
if (options.parentExecutionId !== undefined) {
// Must be changed to become typed
return {
startedAt: new Date(),
workflow,
workflowExecute,
};
}
const execution = workflowExecute.processRunExecutionData(workflow);
activeExecutions.attachWorkflowExecution(executionId, execution);
data = await execution;
Expand Down Expand Up @@ -909,10 +886,7 @@ async function executeWorkflow(
// remove execution from active executions
activeExecutions.remove(executionId, fullRunData);

await Container.get(ExecutionRepository).updateExistingExecution(
executionId,
fullExecutionData,
);
await executionRepository.updateExistingExecution(executionId, fullExecutionData);
throw objectToError(
{
...executionError,
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/workflow-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import type {
WorkflowOperationError,
Workflow,
NodeOperationError,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';

import type { IWorkflowExecutionDataProcess } from '@/interfaces';
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import { CredentialsRepository } from '@/databases/repositories/credentials.repository';
import { VariablesService } from '@/environments/variables/variables.service.ee';
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/workflow-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type {
IRun,
WorkflowExecuteMode,
WorkflowHooks,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
ErrorReporterProxy as ErrorReporter,
Expand All @@ -26,7 +27,7 @@ import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExternalHooks } from '@/external-hooks';
import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/interfaces';
import type { IExecutionResponse } from '@/interfaces';
import { NodeTypes } from '@/node-types';
import type { Job, JobData, JobResult } from '@/scaling/scaling.types';
import type { ScalingService } from '@/scaling/scaling.service';
Expand Down
8 changes: 2 additions & 6 deletions packages/cli/src/workflows/workflow-execution.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
IRunExecutionData,
IWorkflowExecuteAdditionalData,
WorkflowExecuteMode,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
SubworkflowOperationError,
Expand All @@ -21,12 +22,7 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import * as WorkflowHelpers from '@/workflow-helpers';
import type { WorkflowRequest } from '@/workflows/workflow.request';
import type {
ExecutionPayload,
IWorkflowDb,
IWorkflowErrorData,
IWorkflowExecutionDataProcess,
} from '@/interfaces';
import type { ExecutionPayload, IWorkflowDb, IWorkflowErrorData } from '@/interfaces';
import { NodeTypes } from '@/node-types';
import { WorkflowRunner } from '@/workflow-runner';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
Expand Down
18 changes: 16 additions & 2 deletions packages/workflow/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2167,13 +2167,27 @@ export const eventNamesAiNodes = [

export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number];

export interface IWorkflowExecutionDataProcess {
destinationNode?: string;
restartExecutionId?: string;
executionMode: WorkflowExecuteMode;
executionData?: IRunExecutionData;
runData?: IRunData;
pinData?: IPinData;
retryOf?: string;
pushRef?: string;
startNodes?: StartNodeData[];
workflowData: IWorkflowBase;
userId?: string;
projectId?: string;
}

export interface ExecuteWorkflowOptions {
node?: INode;
parentWorkflowId: string;
inputData?: INodeExecutionData[];
parentExecutionId?: string;
loadedWorkflowData?: IWorkflowBase;
loadedRunData?: any;
loadedRunData?: IWorkflowExecutionDataProcess;
parentWorkflowSettings?: IWorkflowSettings;
parentCallbackManager?: CallbackManager;
}
Expand Down

0 comments on commit 4f94319

Please sign in to comment.