Skip to content

Commit

Permalink
feat: Implement new partial execution logic for acyclic workflows (no…
Browse files Browse the repository at this point in the history
…-changelog) (#10256)

Co-authored-by: Tomi Turtiainen <[email protected]>
  • Loading branch information
despairblue and tomi authored Sep 18, 2024
1 parent 73f89ef commit 2a084f9
Show file tree
Hide file tree
Showing 31 changed files with 2,367 additions and 20 deletions.
10 changes: 5 additions & 5 deletions cypress/e2e/19-execution.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ describe('Execution', () => {

workflowPage.getters.clearExecutionDataButton().should('be.visible');

cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun');
cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun');

workflowPage.getters
.canvasNodeByName('do something with them')
Expand All @@ -525,7 +525,7 @@ describe('Execution', () => {

workflowPage.getters.zoomToFitButton().click();

cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun');
cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun');

workflowPage.getters
.canvasNodeByName('If')
Expand All @@ -547,7 +547,7 @@ describe('Execution', () => {

workflowPage.getters.clearExecutionDataButton().should('be.visible');

cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun');
cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun');

workflowPage.getters
.canvasNodeByName('NoOp2')
Expand Down Expand Up @@ -576,7 +576,7 @@ describe('Execution', () => {
it('should successfully execute partial executions with nodes attached to the second output', () => {
cy.createFixtureWorkflow('Test_Workflow_pairedItem_incomplete_manual_bug.json');

cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun');
cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun');

workflowPage.getters.zoomToFitButton().click();
workflowPage.getters.executeWorkflowButton().click();
Expand All @@ -596,7 +596,7 @@ describe('Execution', () => {
it('should execute workflow partially up to the node that has issues', () => {
cy.createFixtureWorkflow('Test_workflow_partial_execution_with_missing_credentials.json');

cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun');
cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun');

workflowPage.getters.zoomToFitButton().click();
workflowPage.getters.executeWorkflowButton().click();
Expand Down
2 changes: 1 addition & 1 deletion cypress/e2e/28-debug.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('Debug', () => {
it('should be able to debug executions', () => {
cy.intercept('GET', '/rest/executions?filter=*').as('getExecutions');
cy.intercept('GET', '/rest/executions/*').as('getExecution');
cy.intercept('POST', '/rest/workflows/**/run').as('postWorkflowRun');
cy.intercept('POST', '/rest/workflows/**/run?**').as('postWorkflowRun');

cy.signinAsOwner();

Expand Down
2 changes: 1 addition & 1 deletion cypress/e2e/30-editor-after-route-changes.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ describe('Editor actions should work', () => {
it('after switching between Editor and Debug', () => {
cy.intercept('GET', '/rest/executions?filter=*').as('getExecutions');
cy.intercept('GET', '/rest/executions/*').as('getExecution');
cy.intercept('POST', '/rest/workflows/**/run').as('postWorkflowRun');
cy.intercept('POST', '/rest/workflows/**/run?**').as('postWorkflowRun');

editWorkflowAndDeactivate();
workflowPage.actions.executeWorkflow();
Expand Down
2 changes: 1 addition & 1 deletion cypress/pages/workflow-executions-tab.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class WorkflowExecutionsTab extends BasePage {
},
createManualExecutions: (count: number) => {
for (let i = 0; i < count; i++) {
cy.intercept('POST', '/rest/workflows/**/run').as('workflowExecution');
cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowExecution');
workflowPage.actions.executeWorkflow();
cy.wait('@workflowExecution');
}
Expand Down
2 changes: 1 addition & 1 deletion cypress/utils/executions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export function runMockWorkflowExecution({
}) {
const executionId = nanoid(8);

cy.intercept('POST', '/rest/workflows/**/run', {
cy.intercept('POST', '/rest/workflows/**/run?**', {
statusCode: 201,
body: {
data: {
Expand Down
9 changes: 9 additions & 0 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -640,4 +640,13 @@ export const schema = {
env: 'N8N_PROXY_HOPS',
doc: 'Number of reverse-proxies n8n is running behind',
},

featureFlags: {
partialExecutionVersionDefault: {
format: String,
default: '0',
env: 'PARTIAL_EXECUTION_VERSION_DEFAULT',
doc: 'Set this to 1 to enable the new partial execution logic by default.',
},
},
};
30 changes: 22 additions & 8 deletions packages/cli/src/workflow-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ export class WorkflowRunner {
}
}

/** Run the workflow */
/** Run the workflow
* @param realtime This is used in queue mode to change the priority of an execution, making sure they are picked up quicker.
*/
async run(
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
Expand Down Expand Up @@ -278,6 +280,7 @@ export class WorkflowRunner {
data.startNodes === undefined ||
data.startNodes.length === 0
) {
// Full Execution
this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, {
executionId,
});
Expand All @@ -294,16 +297,27 @@ export class WorkflowRunner {
data.pinData,
);
} else {
// Partial Execution
this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId });
// Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.runPartialWorkflow(
workflow,
data.runData,
data.startNodes,
data.destinationNode,
data.pinData,
);

if (data.partialExecutionVersion === '1') {
workflowExecution = workflowExecute.runPartialWorkflow2(
workflow,
data.runData,
data.destinationNode,
data.pinData,
);
} else {
workflowExecution = workflowExecute.runPartialWorkflow(
workflow,
data.runData,
data.startNodes,
data.destinationNode,
data.pinData,
);
}
}

this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/workflows/workflow-execution.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ export class WorkflowExecutionService {
{ workflowData, runData, startNodes, destinationNode }: WorkflowRequest.ManualRunPayload,
user: User,
pushRef?: string,
partialExecutionVersion?: string,
) {
const pinData = workflowData.pinData;
const pinnedTrigger = this.selectPinnedActivatorStarter(
Expand Down Expand Up @@ -135,6 +136,7 @@ export class WorkflowExecutionService {
startNodes,
workflowData,
userId: user.id,
partialExecutionVersion: partialExecutionVersion ?? '0',
};

const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name];
Expand Down
7 changes: 6 additions & 1 deletion packages/cli/src/workflows/workflow.request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ export declare namespace WorkflowRequest {

type NewName = AuthenticatedRequest<{}, {}, {}, { name?: string }>;

type ManualRun = AuthenticatedRequest<{ workflowId: string }, {}, ManualRunPayload>;
type ManualRun = AuthenticatedRequest<
{ workflowId: string },
{},
ManualRunPayload,
{ partialExecutionVersion?: string }
>;

type Share = AuthenticatedRequest<{ workflowId: string }, {}, { shareWithIds: string[] }>;

Expand Down
3 changes: 3 additions & 0 deletions packages/cli/src/workflows/workflows.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ export class WorkflowsController {
req.body,
req.user,
req.headers['push-ref'] as string,
req.query.partialExecutionVersion === '-1'
? config.getEnv('featureFlags.partialExecutionVersionDefault')
: req.query.partialExecutionVersion,
);
}

Expand Down
Loading

0 comments on commit 2a084f9

Please sign in to comment.