Skip to content

Commit

Permalink
feat(core): Expand crash recovery to cover queue mode (#9676)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Jun 18, 2024
1 parent 7e44cd7 commit c58621a
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 18 deletions.
9 changes: 9 additions & 0 deletions packages/cli/src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ export class Queue {
return await this.jobQueue.getJobs(jobTypes);
}

/**
* Get IDs of executions that are currently in progress in the queue.
*/
async getInProgressExecutionIds() {
const inProgressJobs = await this.getJobs(['active', 'waiting']);

return new Set(inProgressJobs.map((job) => job.data.executionId));
}

async process(concurrency: number, fn: Bull.ProcessCallbackFunction<JobData>): Promise<void> {
return await this.jobQueue.process(concurrency, fn);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { ExecutionService } from '@/executions/execution.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
Expand Down Expand Up @@ -65,8 +66,6 @@ export class Start extends BaseCommand {

protected server = Container.get(Server);

private pruningService: PruningService;

constructor(argv: string[], cmdConfig: Config) {
super(argv, cmdConfig);
this.setInstanceType('main');
Expand Down Expand Up @@ -294,6 +293,7 @@ export class Start extends BaseCommand {
await this.server.start();

Container.get(PruningService).init();
Container.get(ExecutionRecoveryService).init();

if (config.getEnv('executions.mode') === 'regular') {
await this.runEnqueuedExecutions();
Expand Down
15 changes: 15 additions & 0 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,21 @@ export const schema = {
default: 10000,
env: 'EXECUTIONS_DATA_PRUNE_MAX_COUNT',
},

queueRecovery: {
interval: {
doc: 'How often (minutes) to check for queue recovery',
format: Number,
default: 180,
env: 'N8N_EXECUTIONS_QUEUE_RECOVERY_INTERVAL',
},
batchSize: {
doc: 'Size of batch of executions to check for queue recovery',
format: Number,
default: 100,
env: 'N8N_EXECUTIONS_QUEUE_RECOVERY_BATCH',
},
},
},

queue: {
Expand Down
18 changes: 15 additions & 3 deletions packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
},
);

this.logger.info('[Execution Recovery] Marked executions as `crashed`', {
executionIds,
});
this.logger.info('Marked executions as `crashed`', { executionIds });
}

/**
Expand Down Expand Up @@ -773,4 +771,18 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {

return executions.map(({ id }) => id);
}

/**
* Retrieve a batch of execution IDs with `new` or `running` status, in most recent order.
*/
async getInProgressExecutionIds(batchSize: number) {
const executions = await this.find({
select: ['id'],
where: { status: In(['new', 'running']) },
order: { startedAt: 'DESC' },
take: batchSize,
});

return executions.map(({ id }) => id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@ import { createWorkflow } from '@test-integration/db/workflows';
import { createExecution } from '@test-integration/db/executions';
import * as testDb from '@test-integration/testDb';

import { NodeConnectionType } from 'n8n-workflow';
import { mock } from 'jest-mock-extended';
import { OrchestrationService } from '@/services/orchestration.service';
import config from '@/config';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';

import { InternalHooks } from '@/InternalHooks';
import { Push } from '@/push';
import { ARTIFICIAL_TASK_DATA } from '@/constants';
import { NodeCrashedError } from '@/errors/node-crashed.error';
import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNode';
import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow';

import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses';
import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity';
import { NodeConnectionType } from 'n8n-workflow';
import { OrchestrationService } from '@/services/orchestration.service';
import config from '@/config';
import type { Logger } from '@/Logger';

/**
* Workflow producing an execution whose data will be truncated by an instance crash.
Expand Down Expand Up @@ -174,20 +176,20 @@ export const setupMessages = (executionId: string, workflowName: string): EventM
};

describe('ExecutionRecoveryService', () => {
let executionRecoveryService: ExecutionRecoveryService;
let push: Push;
let executionRepository: ExecutionRepository;
let executionRecoveryService: ExecutionRecoveryService;
let orchestrationService: OrchestrationService;
let executionRepository: ExecutionRepository;

beforeAll(async () => {
await testDb.init();

mockInstance(InternalHooks);
push = mockInstance(Push);
executionRepository = Container.get(ExecutionRepository);
orchestrationService = Container.get(OrchestrationService);

mockInstance(InternalHooks);
executionRecoveryService = new ExecutionRecoveryService(
mock<Logger>(),
push,
executionRepository,
orchestrationService,
Expand All @@ -199,13 +201,78 @@ describe('ExecutionRecoveryService', () => {
});

afterEach(async () => {
config.load(config.default);
jest.restoreAllMocks();
await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']);
executionRecoveryService.shutdown();
});

afterAll(async () => {
await testDb.terminate();
});

describe('scheduleQueueRecovery', () => {
describe('queue mode', () => {
it('if leader, should schedule queue recovery', () => {
/**
* Arrange
*/
config.set('executions.mode', 'queue');
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true);
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');

/**
* Act
*/
executionRecoveryService.init();

/**
* Assert
*/
expect(scheduleSpy).toHaveBeenCalled();
});

it('if follower, should do nothing', () => {
/**
* Arrange
*/
config.set('executions.mode', 'queue');
jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(false);
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');

/**
* Act
*/
executionRecoveryService.init();

/**
* Assert
*/
expect(scheduleSpy).not.toHaveBeenCalled();
});
});

describe('regular mode', () => {
it('should do nothing', () => {
/**
* Arrange
*/
config.set('executions.mode', 'regular');
const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery');

/**
* Act
*/
executionRecoveryService.init();

/**
* Assert
*/
expect(scheduleSpy).not.toHaveBeenCalled();
});
});
});

describe('recoverFromLogs', () => {
describe('if follower', () => {
test('should do nothing', async () => {
Expand Down
Loading

0 comments on commit c58621a

Please sign in to comment.