Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Retain execution data between partial executions (new flow) #11828

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion packages/@n8n/api-types/src/push/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,19 @@ type NodeExecuteAfter = {
};
};

type DeleteRunData = {
type: 'deleteRunData';
data: {
executionId: string;
nodeNamesToPurge: string[];
};
};

export type ExecutionPushMessage =
| ExecutionStarted
| ExecutionWaiting
| ExecutionFinished
| ExecutionRecovered
| NodeExecuteBefore
| NodeExecuteAfter;
| NodeExecuteAfter
| DeleteRunData;
12 changes: 12 additions & 0 deletions packages/cli/src/workflow-execute-additional-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,17 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
const logger = Container.get(Logger);
const pushInstance = Container.get(Push);
return {
deleteRunData: [
async function (this: WorkflowHooks, nodeNamesToPurge: string[]) {
const { pushRef, executionId } = this;

if (pushRef === undefined) {
return;
}

pushInstance.send('deleteRunData', { nodeNamesToPurge, executionId }, pushRef);
},
],
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { pushRef, executionId } = this;
Expand Down Expand Up @@ -363,6 +374,7 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
const workflowStatisticsService = Container.get(WorkflowStatisticsService);
const eventService = Container.get(EventService);
return {
deleteRunData: [],
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/PartialExecutionUtils/__tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ interface StubNode {
name: string;
parameters?: INodeParameters;
disabled?: boolean;
type?: string;
}

export function createNodeData(stubData: StubNode): INode {
return {
name: stubData.name,
parameters: stubData.parameters ?? {},
type: 'test.set',
type: stubData.type ?? 'n8n-nodes-base.set',
typeVersion: 1,
id: 'uuid-1234',
position: [100, 100],
Expand Down
9 changes: 8 additions & 1 deletion packages/core/src/WorkflowExecute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,16 @@ export class WorkflowExecute {
// 5. Handle Cycles
startNodes = handleCycles(graph, startNodes, trigger);

// 6. Clean Run Data
// 6.1 Clean Run Data
const newRunData: IRunData = cleanRunData(runData, graph, startNodes);

// 6.2 Inform FE about what run data to purge from the workflow store
const nodeNamesToPurge = new Set(Object.keys(runData));
for (const nodeName of Object.keys(newRunData)) {
nodeNamesToPurge.delete(nodeName);
}
void this.executeHook('deleteRunData', [[...nodeNamesToPurge]]);

// 7. Recreate Execution Stack
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(subgraph, new Set(startNodes), runData, pinData ?? {});
Expand Down
44 changes: 41 additions & 3 deletions packages/core/test/WorkflowExecute.test.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
import type { IRun, WorkflowTestData } from 'n8n-workflow';
import type { IRun, IRunData, WorkflowTestData } from 'n8n-workflow';
import {
ApplicationError,
createDeferredPromise,
NodeExecutionOutput,
Workflow,
} from 'n8n-workflow';

import { DirectedGraph } from '@/PartialExecutionUtils';
import { createNodeData, toITaskData } from '@/PartialExecutionUtils/__tests__/helpers';
import { WorkflowExecute } from '@/WorkflowExecute';

import * as Helpers from './helpers';
import { legacyWorkflowExecuteTests, v1WorkflowExecuteTests } from './helpers/constants';

const nodeTypes = Helpers.NodeTypes();

describe('WorkflowExecute', () => {
describe('v0 execution order', () => {
const tests: WorkflowTestData[] = legacyWorkflowExecuteTests;

const executionMode = 'manual';
const nodeTypes = Helpers.NodeTypes();

for (const testData of tests) {
test(testData.description, async () => {
Expand Down Expand Up @@ -206,7 +209,7 @@ describe('WorkflowExecute', () => {
}
});

describe('WorkflowExecute, NodeExecutionOutput type test', () => {
test('WorkflowExecute, NodeExecutionOutput type test', () => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this was supposed to be test and not describe.

//TODO Add more tests here when execution hints are added to some node types
const nodeExecutionOutput = new NodeExecutionOutput(
[[{ json: { data: 123 } }]],
Expand All @@ -217,4 +220,39 @@ describe('WorkflowExecute', () => {
expect(nodeExecutionOutput[0][0].json.data).toEqual(123);
expect(nodeExecutionOutput.getHints()[0].message).toEqual('TEXT HINT');
});

describe('runPartialWorkflow2', () => {
test('executes the deleteRunData hook with the names of the nodes that have stale run data', async () => {
// ARRANGE
const waitPromise = createDeferredPromise<IRun>();
const nodeExecutionOrder: string[] = [];
const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder);
const workflowExecute = new WorkflowExecute(additionalData, 'manual');

const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const workflow = new DirectedGraph()
.addNodes(trigger, node1, node2)
.addConnections({ from: trigger, to: node1 }, { from: node1, to: node2 })
.toWorkflow({ name: '', active: false, nodeTypes });
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { name: trigger.name } }])],
[node1.name]: [toITaskData([{ data: { name: node1.name } }])],
[node2.name]: [toITaskData([{ data: { name: node2.name } }])],
};

jest.spyOn(workflowExecute, 'processRunExecutionData').mockImplementationOnce(jest.fn());
const executeHookSpy = jest
.spyOn(workflowExecute, 'executeHook')
.mockImplementation(jest.fn());

// ACT
await workflowExecute.runPartialWorkflow2(workflow, runData, 'node1');

// ASSERT
expect(executeHookSpy).toHaveBeenCalledTimes(1);
expect(executeHookSpy).toHaveBeenCalledWith('deleteRunData', [[node1.name, node2.name]]);
});
});
});
5 changes: 5 additions & 0 deletions packages/core/test/helpers/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
import { NodeConnectionType } from 'n8n-workflow';

import { If } from '../../../nodes-base/dist/nodes/If/If.node';
import { ManualTrigger } from '../../../nodes-base/dist/nodes/ManualTrigger/ManualTrigger.node';
import { Merge } from '../../../nodes-base/dist/nodes/Merge/Merge.node';
import { NoOp } from '../../../nodes-base/dist/nodes/NoOp/NoOp.node';
import { Set } from '../../../nodes-base/dist/nodes/Set/Set.node';
Expand All @@ -33,6 +34,10 @@ export const predefinedNodesTypes: INodeTypeData = {
type: new Start(),
sourcePath: '',
},
'n8n-nodes-base.manualTrigger': {
type: new ManualTrigger(),
sourcePath: '',
},
Comment on lines +37 to +40
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed an actual trigger to test the runPartialWorkflow2, especially the call to findTriggerForPartialExecution.

The start node above is not marked as a trigger, so I added the manual trigger here.

'n8n-nodes-base.versionTest': {
sourcePath: '',
type: {
Expand Down
53 changes: 53 additions & 0 deletions packages/editor-ui/src/composables/usePushConnection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,5 +249,58 @@ describe('usePushConnection()', () => {
expect(pushConnection.retryTimeout).not.toBeNull();
});
});

describe('deleteRunData', async () => {
it("enqueues messages if we don't have the active execution id yet", async () => {
uiStore.isActionActive.workflowRunning = true;
const event: PushMessage = {
type: 'deleteRunData',
data: {
executionId: '1',
nodeNamesToPurge: ['foo'],
},
};

expect(pushConnection.retryTimeout.value).toBeNull();
expect(pushConnection.pushMessageQueue.value.length).toBe(0);

const result = await pushConnection.pushMessageReceived(event);

expect(result).toBe(false);
expect(pushConnection.pushMessageQueue.value).toHaveLength(1);
expect(pushConnection.pushMessageQueue.value).toContainEqual({
message: event,
retriesLeft: 5,
});
expect(pushConnection.retryTimeout).not.toBeNull();
});

it('clears the execution data for all nodes in the event', async () => {
// ARRANGE
uiStore.isActionActive.workflowRunning = true;
const event: PushMessage = {
type: 'deleteRunData',
data: {
executionId: '1',
nodeNamesToPurge: ['foo', 'bar'],
},
};
workflowsStore.activeExecutionId = event.data.executionId;

expect(pushConnection.retryTimeout.value).toBeNull();
expect(pushConnection.pushMessageQueue.value.length).toBe(0);

vi.spyOn(workflowsStore, 'clearNodeExecutionData').mockReturnValueOnce();

// ACT
const result = await pushConnection.pushMessageReceived(event);

// ASSERT
expect(result).toBe(true);
expect(workflowsStore.clearNodeExecutionData).toHaveBeenCalledTimes(2);
expect(workflowsStore.clearNodeExecutionData).toHaveBeenNthCalledWith(1, 'foo');
expect(workflowsStore.clearNodeExecutionData).toHaveBeenNthCalledWith(2, 'bar');
});
});
});
});
10 changes: 9 additions & 1 deletion packages/editor-ui/src/composables/usePushConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
return false;
}

if (receivedData.type === 'nodeExecuteAfter' || receivedData.type === 'nodeExecuteBefore') {
if (
receivedData.type === 'nodeExecuteAfter' ||
receivedData.type === 'nodeExecuteBefore' ||
receivedData.type === 'deleteRunData'
) {
if (!uiStore.isActionActive['workflowRunning']) {
// No workflow is running so ignore the messages
return false;
Expand Down Expand Up @@ -450,6 +454,10 @@ export function usePushConnection({ router }: { router: ReturnType<typeof useRou
// Nothing to do
} else if (receivedData.type === 'executionStarted') {
// Nothing to do
} else if (receivedData.type === 'deleteRunData') {
for (const nodeName of receivedData.data.nodeNamesToPurge) {
workflowsStore.clearNodeExecutionData(nodeName);
}
} else if (receivedData.type === 'nodeExecuteAfter') {
// A node finished to execute. Add its data
const pushData = receivedData.data;
Expand Down
73 changes: 73 additions & 0 deletions packages/editor-ui/src/composables/useRunWorkflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import { useUIStore } from '@/stores/ui.store';
import { useWorkflowHelpers } from '@/composables/useWorkflowHelpers';
import { useToast } from './useToast';
import { useI18n } from '@/composables/useI18n';
import { useLocalStorage } from '@vueuse/core';
import { ref } from 'vue';

vi.mock('@/stores/workflows.store', () => ({
useWorkflowsStore: vi.fn().mockReturnValue({
Expand All @@ -29,6 +31,16 @@ vi.mock('@/stores/workflows.store', () => ({
}),
}));

vi.mock('@vueuse/core', async () => {
// eslint-disable-next-line @typescript-eslint/consistent-type-imports
const originalModule = await vi.importActual<typeof import('@vueuse/core')>('@vueuse/core');

return {
...originalModule, // Keep all original exports
useLocalStorage: vi.fn().mockReturnValue({ value: undefined }), // Mock useLocalStorage
};
});

vi.mock('@/composables/useTelemetry', () => ({
useTelemetry: vi.fn().mockReturnValue({ track: vi.fn() }),
}));
Expand Down Expand Up @@ -99,6 +111,7 @@ describe('useRunWorkflow({ router })', () => {

beforeEach(() => {
uiStore.activeActions = [];
vi.clearAllMocks();
});

describe('runWorkflowApi()', () => {
Expand Down Expand Up @@ -250,6 +263,66 @@ describe('useRunWorkflow({ router })', () => {
const result = await runWorkflow({});
expect(result).toEqual(mockExecutionResponse);
});

it('does not use the original run data if `PartialExecution.version` is set to 0', async () => {
// ARRANGE
const mockExecutionResponse = { executionId: '123' };
const mockRunData = { nodeName: [] };
const { runWorkflow } = useRunWorkflow({ router });

vi.mocked(useLocalStorage).mockReturnValueOnce(ref(0));
vi.mocked(rootStore).pushConnectionActive = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue({
name: 'Test Workflow',
} as Workflow);
vi.mocked(workflowHelpers).getWorkflowDataToSave.mockResolvedValue({
id: 'workflowId',
nodes: [],
} as unknown as IWorkflowData);
vi.mocked(workflowsStore).getWorkflowRunData = mockRunData;

// ACT
const result = await runWorkflow({});

// ASSERT
expect(result).toEqual(mockExecutionResponse);
expect(workflowsStore.setWorkflowExecutionData).toHaveBeenCalledTimes(1);
expect(vi.mocked(workflowsStore.setWorkflowExecutionData).mock.calls[0][0]).toMatchObject({
data: { resultData: { runData: {} } },
});
});

it('retains the original run data if `PartialExecution.version` is set to 1', async () => {
// ARRANGE
const mockExecutionResponse = { executionId: '123' };
const mockRunData = { nodeName: [] };
const { runWorkflow } = useRunWorkflow({ router });

vi.mocked(useLocalStorage).mockReturnValueOnce(ref(1));
vi.mocked(rootStore).pushConnectionActive = true;
vi.mocked(workflowsStore).runWorkflow.mockResolvedValue(mockExecutionResponse);
vi.mocked(workflowsStore).nodesIssuesExist = false;
vi.mocked(workflowHelpers).getCurrentWorkflow.mockReturnValue({
name: 'Test Workflow',
} as Workflow);
vi.mocked(workflowHelpers).getWorkflowDataToSave.mockResolvedValue({
id: 'workflowId',
nodes: [],
} as unknown as IWorkflowData);
vi.mocked(workflowsStore).getWorkflowRunData = mockRunData;

// ACT
const result = await runWorkflow({});

// ASSERT
expect(result).toEqual(mockExecutionResponse);
expect(workflowsStore.setWorkflowExecutionData).toHaveBeenCalledTimes(1);
expect(vi.mocked(workflowsStore.setWorkflowExecutionData).mock.calls[0][0]).toMatchObject({
data: { resultData: { runData: mockRunData } },
});
});
});

describe('consolidateRunDataAndStartNodes()', () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/editor-ui/src/composables/useRunWorkflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
executedNode,
data: {
resultData: {
runData: newRunData ?? {},
runData: startRunData.runData ?? {},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes that previous run data disappears. This should be the same run data that is sent to the BE.

startRunData.runData is either the run data the FE pruned (if the old flow is used) or the full run data from before (if the new flow is used).

pinData: workflowData.pinData,
workflowData,
},
Expand Down