diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 67616ee4b3f63..03949e7ea5d34 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -204,6 +204,12 @@ export class ActiveExecutions { async shutdown(cancelAll = false) { let executionIds = Object.keys(this.activeExecutions); + if (config.getEnv('executions.mode') === 'regular') { + // removal of active executions will no longer release capacity back, + // so that throttled executions cannot resume during shutdown + this.concurrencyControl.disable(); + } + if (cancelAll) { if (config.getEnv('executions.mode') === 'regular') { await this.concurrencyControl.removeAll(this.activeExecutions); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 62762adbb85db..74cc204490626 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -32,6 +32,7 @@ import { ExecutionService } from '@/executions/execution.service'; import { OwnershipService } from '@/services/ownership.service'; import { WorkflowRunner } from '@/WorkflowRunner'; import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; +import { EventRelay } from '@/eventbus/event-relay.service'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires const open = require('open'); @@ -375,6 +376,10 @@ export class Start extends BaseCommand { projectId: project.id, }; + Container.get(EventRelay).emit('execution-started-during-bootup', { + executionId: execution.id, + }); + // do not block - each execution either runs concurrently or is queued void workflowRunner.run(data, undefined, false, execution.id); } diff --git a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts index c34ac346acce0..817c3c71abb05 100644 --- a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts +++ b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts @@ -12,11 +12,13 @@ import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { IExecutingWorkflowData } from '@/Interfaces'; import type { Telemetry } from '@/telemetry'; +import type { EventRelay } from '@/eventbus/event-relay.service'; describe('ConcurrencyControlService', () => { const logger = mock(); const executionRepository = mock(); const telemetry = mock(); + const eventRelay = mock(); afterEach(() => { config.set('executions.concurrency.productionLimit', -1); @@ -35,7 +37,12 @@ describe('ConcurrencyControlService', () => { /** * Act */ - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); /** * Assert @@ -56,7 +63,7 @@ describe('ConcurrencyControlService', () => { /** * Act */ - new ConcurrencyControlService(logger, executionRepository, telemetry); + new ConcurrencyControlService(logger, executionRepository, telemetry, eventRelay); } catch (error) { /** * Assert @@ -74,7 +81,12 @@ describe('ConcurrencyControlService', () => { /** * Act */ - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); /** * Assert @@ -92,7 +104,12 @@ describe('ConcurrencyControlService', () => { /** * Act */ - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); /** * Act @@ -111,7 +128,12 @@ describe('ConcurrencyControlService', () => { /** * Act */ - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); /** * Assert @@ -135,7 +157,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', 1); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue'); /** @@ -156,7 +183,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', 1); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue'); /** @@ -180,7 +212,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', 1); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue'); /** @@ -201,7 +238,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', 1); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue'); /** @@ -225,7 +267,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', 1); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); /** @@ -248,7 +295,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', 1); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); /** @@ -271,7 +323,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', 2); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); jest .spyOn(ConcurrencyQueue.prototype, 'getAll') @@ -310,7 +367,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', -1); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue'); /** @@ -333,7 +395,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', -1); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue'); /** @@ -355,7 +422,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', -1); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); /** @@ -385,7 +457,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT); config.set('deployment.type', 'cloud'); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); /** * Act @@ -410,7 +487,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT); config.set('deployment.type', 'cloud'); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); /** * Act @@ -437,7 +519,12 @@ describe('ConcurrencyControlService', () => { */ config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT); config.set('deployment.type', 'cloud'); - const service = new ConcurrencyControlService(logger, executionRepository, telemetry); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventRelay, + ); /** * Act diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index 77310b769465e..a71f22b423515 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -8,13 +8,14 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; import type { IExecutingWorkflowData } from '@/Interfaces'; import { Telemetry } from '@/telemetry'; +import { EventRelay } from '@/eventbus/event-relay.service'; export const CLOUD_TEMP_PRODUCTION_LIMIT = 999; export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200]; @Service() export class ConcurrencyControlService { - private readonly isEnabled: boolean; + private isEnabled: boolean; private readonly productionLimit: number; @@ -28,6 +29,7 @@ export class ConcurrencyControlService { private readonly logger: Logger, private readonly executionRepository: ExecutionRepository, private readonly telemetry: Telemetry, + private readonly eventRelay: EventRelay, ) { this.productionLimit = config.getEnv('executions.concurrency.productionLimit'); @@ -61,6 +63,7 @@ export class ConcurrencyControlService { this.productionQueue.on('execution-throttled', ({ executionId }: { executionId: string }) => { this.log('Execution throttled', { executionId }); + this.eventRelay.emit('execution-throttled', { executionId }); }); this.productionQueue.on('execution-released', async (executionId: string) => { @@ -130,6 +133,10 @@ export class ConcurrencyControlService { this.logger.info('Canceled enqueued executions with response promises', { executionIds }); } + disable() { + this.isEnabled = false; + } + // ---------------------------------- // private // ---------------------------------- diff --git a/packages/cli/src/concurrency/concurrency-queue.ts b/packages/cli/src/concurrency/concurrency-queue.ts index 571d1593e86b9..90c62d2efc21e 100644 --- a/packages/cli/src/concurrency/concurrency-queue.ts +++ b/packages/cli/src/concurrency/concurrency-queue.ts @@ -49,7 +49,7 @@ export class ConcurrencyQueue extends EventEmitter { } getAll() { - return new Set(...this.queue.map((item) => item.executionId)); + return new Set(this.queue.map((item) => item.executionId)); } private resolveNext() { diff --git a/packages/cli/src/eventbus/EventMessageClasses/EventMessageExecution.ts b/packages/cli/src/eventbus/EventMessageClasses/EventMessageExecution.ts new file mode 100644 index 0000000000000..9a8864c7f0b76 --- /dev/null +++ b/packages/cli/src/eventbus/EventMessageClasses/EventMessageExecution.ts @@ -0,0 +1,45 @@ +import { AbstractEventMessage, isEventMessageOptionsWithType } from './AbstractEventMessage'; +import type { JsonObject } from 'n8n-workflow'; +import { EventMessageTypeNames } from 'n8n-workflow'; +import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions'; +import type { AbstractEventPayload } from './AbstractEventPayload'; +import type { EventNamesExecutionType } from '.'; + +export interface EventPayloadExecution extends AbstractEventPayload { + executionId: string; +} + +export interface EventMessageExecutionOptions extends AbstractEventMessageOptions { + eventName: EventNamesExecutionType; + + payload?: EventPayloadExecution; +} + +export class EventMessageExecution extends AbstractEventMessage { + readonly __type = EventMessageTypeNames.execution; + + eventName: EventNamesExecutionType; + + payload: EventPayloadExecution; + + constructor(options: EventMessageExecutionOptions) { + super(options); + if (options.payload) this.setPayload(options.payload); + if (options.anonymize) { + this.anonymize(); + } + } + + setPayload(payload: EventPayloadExecution): this { + this.payload = payload; + return this; + } + + deserialize(data: JsonObject): this { + if (isEventMessageOptionsWithType(data, this.__type)) { + this.setOptionsOrDefault(data); + if (data.payload) this.setPayload(data.payload as EventPayloadExecution); + } + return this; + } +} diff --git a/packages/cli/src/eventbus/EventMessageClasses/index.ts b/packages/cli/src/eventbus/EventMessageClasses/index.ts index 65c323b873ab3..461394d378d94 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/index.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/index.ts @@ -1,5 +1,6 @@ import type { EventMessageAiNode } from './EventMessageAiNode'; import type { EventMessageAudit } from './EventMessageAudit'; +import type { EventMessageExecution } from './EventMessageExecution'; import type { EventMessageGeneric } from './EventMessageGeneric'; import type { EventMessageNode } from './EventMessageNode'; import type { EventMessageWorkflow } from './EventMessageWorkflow'; @@ -13,6 +14,10 @@ export const eventNamesWorkflow = [ ] as const; export const eventNamesGeneric = ['n8n.worker.started', 'n8n.worker.stopped'] as const; export const eventNamesNode = ['n8n.node.started', 'n8n.node.finished'] as const; +export const eventNamesExecution = [ + 'n8n.execution.throttled', + 'n8n.execution.started-during-bootup', +] as const; export const eventNamesAudit = [ 'n8n.audit.user.login.success', 'n8n.audit.user.login.failed', @@ -42,12 +47,14 @@ export const eventNamesAudit = [ export type EventNamesWorkflowType = (typeof eventNamesWorkflow)[number]; export type EventNamesAuditType = (typeof eventNamesAudit)[number]; export type EventNamesNodeType = (typeof eventNamesNode)[number]; +export type EventNamesExecutionType = (typeof eventNamesExecution)[number]; export type EventNamesGenericType = (typeof eventNamesGeneric)[number]; export type EventNamesTypes = | EventNamesAuditType | EventNamesWorkflowType | EventNamesNodeType + | EventNamesExecutionType | EventNamesGenericType | EventNamesAiNodesType | 'n8n.destination.test'; @@ -65,4 +72,5 @@ export type EventMessageTypes = | EventMessageWorkflow | EventMessageAudit | EventMessageNode + | EventMessageExecution | EventMessageAiNode; diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 8322897377bef..1db489d45e389 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -35,6 +35,8 @@ import { type EventMessageAiNodeOptions, } from '../EventMessageClasses/EventMessageAiNode'; import { License } from '@/License'; +import type { EventMessageExecutionOptions } from '../EventMessageClasses/EventMessageExecution'; +import { EventMessageExecution } from '../EventMessageClasses/EventMessageExecution'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -397,4 +399,8 @@ export class MessageEventBus extends EventEmitter { async sendAiNodeEvent(options: EventMessageAiNodeOptions) { await this.send(new EventMessageAiNode(options)); } + + async sendExecutionEvent(options: EventMessageExecutionOptions) { + await this.send(new EventMessageExecution(options)); + } } diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts index 6c59b2e24e9cc..87a4326d2619f 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts @@ -222,6 +222,8 @@ export class MessageEventBusLogWriter { case 'n8n.workflow.success': case 'n8n.workflow.failed': case 'n8n.workflow.crashed': + case 'n8n.execution.throttled': + case 'n8n.execution.started-during-bootup': delete results.unfinishedExecutions[executionId]; break; case 'n8n.node.started': diff --git a/packages/cli/src/eventbus/audit-event-relay.service.ts b/packages/cli/src/eventbus/audit-event-relay.service.ts index 59efa0533f9db..fde3abf31a2fc 100644 --- a/packages/cli/src/eventbus/audit-event-relay.service.ts +++ b/packages/cli/src/eventbus/audit-event-relay.service.ts @@ -51,6 +51,10 @@ export class AuditEventRelay { ); this.eventRelay.on('community-package-updated', (event) => this.communityPackageUpdated(event)); this.eventRelay.on('community-package-deleted', (event) => this.communityPackageDeleted(event)); + this.eventRelay.on('execution-throttled', (event) => this.executionThrottled(event)); + this.eventRelay.on('execution-started-during-bootup', (event) => + this.executionStartedDuringBootup(event), + ); } /** @@ -339,4 +343,22 @@ export class AuditEventRelay { payload: { ...user, ...rest }, }); } + + /** + * Execution + */ + + private executionThrottled({ executionId }: Event['execution-throttled']) { + void this.eventBus.sendExecutionEvent({ + eventName: 'n8n.execution.throttled', + payload: { executionId }, + }); + } + + private executionStartedDuringBootup({ executionId }: Event['execution-started-during-bootup']) { + void this.eventBus.sendExecutionEvent({ + eventName: 'n8n.execution.started-during-bootup', + payload: { executionId }, + }); + } } diff --git a/packages/cli/src/eventbus/event.types.ts b/packages/cli/src/eventbus/event.types.ts index b4162f0333736..12bfa12ae8c23 100644 --- a/packages/cli/src/eventbus/event.types.ts +++ b/packages/cli/src/eventbus/event.types.ts @@ -182,4 +182,12 @@ export type Event = { packageAuthor?: string; packageAuthorEmail?: string; }; + + 'execution-throttled': { + executionId: string; + }; + + 'execution-started-during-bootup': { + executionId: string; + }; }; diff --git a/packages/workflow/src/MessageEventBus.ts b/packages/workflow/src/MessageEventBus.ts index a6d44ced9f24d..48c80e9b52220 100644 --- a/packages/workflow/src/MessageEventBus.ts +++ b/packages/workflow/src/MessageEventBus.ts @@ -11,6 +11,7 @@ export const enum EventMessageTypeNames { confirm = '$$EventMessageConfirm', workflow = '$$EventMessageWorkflow', node = '$$EventMessageNode', + execution = '$$EventMessageExecution', aiNode = '$$EventMessageAiNode', }