Skip to content

Commit

Permalink
move active execution shutdown to ActiveExecutions
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Jan 30, 2024
1 parent 00fb197 commit f350b14
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 58 deletions.
31 changes: 29 additions & 2 deletions packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type {
IRun,
ExecutionStatus,
} from 'n8n-workflow';
import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
import { ApplicationError, createDeferredPromise, sleep } from 'n8n-workflow';

import type {
ExecutionPayload,
Expand All @@ -17,14 +17,18 @@ import type {
} from '@/Interfaces';
import { isWorkflowIdValid } from '@/utils';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger';

@Service()
export class ActiveExecutions {
private activeExecutions: {
[executionId: string]: IExecutingWorkflowData;
} = {};

constructor(private readonly executionRepository: ExecutionRepository) {}
constructor(
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
) {}

/**
* Add a new active execution
Expand Down Expand Up @@ -179,6 +183,29 @@ export class ActiveExecutions {
return this.getExecution(executionId).status;
}

/** Wait for all active executions to finish */
async shutdown(cancelAll = false) {
let executionIds = Object.keys(this.activeExecutions);

if (cancelAll) {
const stopPromises = executionIds.map(
async (executionId) => await this.stopExecution(executionId),
);

await Promise.allSettled(stopPromises);
}

let count = 0;
while (executionIds.length !== 0) {
if (count++ % 4 === 0) {
this.logger.info(`Waiting for ${executionIds.length} active executions to finish...`);
}

await sleep(500);
executionIds = Object.keys(this.activeExecutions);
}
}

private getExecution(executionId: string): IExecutingWorkflowData {
const execution = this.activeExecutions[executionId];
if (!execution) {
Expand Down
23 changes: 2 additions & 21 deletions packages/cli/src/commands/executeBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Flags } from '@oclif/core';
import fs from 'fs';
import os from 'os';
import type { IRun, ITaskData } from 'n8n-workflow';
import { ApplicationError, jsonParse, sleep } from 'n8n-workflow';
import { ApplicationError, jsonParse } from 'n8n-workflow';
import { sep } from 'path';
import { diff } from 'json-diff';
import pick from 'lodash/pick';
Expand Down Expand Up @@ -118,28 +118,9 @@ export class ExecuteBatch extends BaseCommand {
}

ExecuteBatch.cancelled = true;
const activeExecutionsInstance = Container.get(ActiveExecutions);
const stopPromises = activeExecutionsInstance
.getActiveExecutions()
.map(async (execution) => await activeExecutionsInstance.stopExecution(execution.id));

await Promise.allSettled(stopPromises);
await Container.get(ActiveExecutions).shutdown(true);

setTimeout(() => process.exit(0), 30000);

let executingWorkflows = activeExecutionsInstance.getActiveExecutions();

let count = 0;
while (executingWorkflows.length !== 0) {
if (count++ % 4 === 0) {
console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`);
executingWorkflows.map((execution) => {
console.log(` - Execution ID ${execution.id}, workflow ID: ${execution.workflowId}`);
});
}
await sleep(500);
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
}
// We may receive true but when called from `process.on`
// we get the signal (SIGINT, etc.)
if (skipExit !== true) {
Expand Down
20 changes: 2 additions & 18 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { createReadStream, createWriteStream, existsSync } from 'fs';
import { pipeline } from 'stream/promises';
import replaceStream from 'replacestream';
import glob from 'fast-glob';
import { sleep, jsonParse } from 'n8n-workflow';
import { jsonParse } from 'n8n-workflow';

import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions';
Expand Down Expand Up @@ -106,23 +106,7 @@ export class Start extends BaseCommand {

await Container.get(InternalHooks).onN8nStop();

// Wait for active workflow executions to finish
const activeExecutionsInstance = Container.get(ActiveExecutions);
let executingWorkflows = activeExecutionsInstance.getActiveExecutions();

let count = 0;
while (executingWorkflows.length !== 0) {
if (count++ % 4 === 0) {
console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`);

executingWorkflows.map((execution) => {
console.log(` - Execution ID ${execution.id}, workflow ID: ${execution.workflowId}`);
});
}

await sleep(500);
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
}
await Container.get(ActiveExecutions).shutdown();

// Finally shut down Event Bus
await Container.get(MessageEventBus).close();
Expand Down
17 changes: 1 addition & 16 deletions packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Container } from 'typedi';
import { Flags, type Config } from '@oclif/core';
import { sleep } from 'n8n-workflow';

import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions';
Expand Down Expand Up @@ -42,21 +41,7 @@ export class Webhook extends BaseCommand {
try {
await this.externalHooks?.run('n8n.stop', []);

// Wait for active workflow executions to finish
const activeExecutionsInstance = Container.get(ActiveExecutions);
let executingWorkflows = activeExecutionsInstance.getActiveExecutions();

let count = 0;
while (executingWorkflows.length !== 0) {
if (count++ % 4 === 0) {
this.logger.info(
`Waiting for ${executingWorkflows.length} active executions to finish...`,
);
}

await sleep(500);
executingWorkflows = activeExecutionsInstance.getActiveExecutions();
}
await Container.get(ActiveExecutions).shutdown();
} catch (error) {
await this.exitWithCrash('There was an error shutting down n8n.', error);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/test/unit/ActiveExecutions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('ActiveExecutions', () => {
let activeExecutions: ActiveExecutions;

beforeEach(() => {
activeExecutions = new ActiveExecutions(executionRepository);
activeExecutions = new ActiveExecutions(mock(), executionRepository);
});

afterEach(() => {
Expand Down

0 comments on commit f350b14

Please sign in to comment.