diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index ac5f586b0ad43..1be878c165dfd 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -29,7 +29,7 @@ import { ExternalHooks } from '@/ExternalHooks'; import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; import type { Job, JobData, JobResult } from '@/scaling/types'; -import { ScalingService } from '@/scaling/scaling.service'; +import type { ScalingService } from '@/scaling/scaling.service'; import * as WorkflowHelpers from '@/WorkflowHelpers'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; @@ -40,7 +40,7 @@ import { EventService } from './events/event.service'; @Service() export class WorkflowRunner { - private readonly scalingService: ScalingService; + private scalingService: ScalingService; private executionsMode = config.getEnv('executions.mode'); @@ -53,11 +53,7 @@ export class WorkflowRunner { private readonly nodeTypes: NodeTypes, private readonly permissionChecker: PermissionChecker, private readonly eventService: EventService, - ) { - if (this.executionsMode === 'queue') { - this.scalingService = Container.get(ScalingService); - } - } + ) {} /** The process did error */ async processError( @@ -360,6 +356,11 @@ export class WorkflowRunner { loadStaticData: !!loadStaticData, }; + if (!this.scalingService) { + const { ScalingService } = await import('@/scaling/scaling.service'); + this.scalingService = Container.get(ScalingService); + } + let priority = 100; if (realtime === true) { // Jobs which require a direct response get a higher priority @@ -404,7 +405,7 @@ export class WorkflowRunner { async (resolve, reject, onCancel) => { onCancel.shouldReject = false; onCancel(async () => { - await Container.get(ScalingService).stopJob(job); + await this.scalingService.stopJob(job); // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the // "workflowExecuteAfter" which we require. diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 505e2e216170d..d9c197c850385 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -4,7 +4,6 @@ import { ApplicationError } from 'n8n-workflow'; import config from '@/config'; import { ActiveExecutions } from '@/ActiveExecutions'; -import { ScalingService } from '@/scaling/scaling.service'; import { WebhookServer } from '@/webhooks/WebhookServer'; import { BaseCommand } from './BaseCommand'; @@ -96,6 +95,7 @@ export class Webhook extends BaseCommand { ); } + const { ScalingService } = await import('@/scaling/scaling.service'); await Container.get(ScalingService).setupQueue(); await this.server.start(); this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 18f039029c020..cadf4f708a8bb 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -7,7 +7,7 @@ import { sleep, ApplicationError } from 'n8n-workflow'; import * as Db from '@/Db'; import * as ResponseHelper from '@/ResponseHelper'; import config from '@/config'; -import { ScalingService } from '@/scaling/scaling.service'; +import type { ScalingService } from '@/scaling/scaling.service'; import { N8N_VERSION, inTest } from '@/constants'; import type { ICredentialsOverwrite } from '@/Interfaces'; import { CredentialsOverwrites } from '@/CredentialsOverwrites'; @@ -171,6 +171,7 @@ export class Worker extends BaseCommand { } async initScalingService() { + const { ScalingService } = await import('@/scaling/scaling.service'); this.scalingService = Container.get(ScalingService); await this.scalingService.setupQueue(); diff --git a/packages/cli/src/executions/__tests__/execution.service.test.ts b/packages/cli/src/executions/__tests__/execution.service.test.ts index 879ee707cdc8d..28c76e1c58faf 100644 --- a/packages/cli/src/executions/__tests__/execution.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution.service.test.ts @@ -6,15 +6,16 @@ import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.err import { MissingExecutionStopError } from '@/errors/missing-execution-stop.error'; import type { ActiveExecutions } from '@/ActiveExecutions'; import type { IExecutionResponse } from '@/Interfaces'; -import type { ScalingService } from '@/scaling/scaling.service'; +import { ScalingService } from '@/scaling/scaling.service'; import type { WaitTracker } from '@/WaitTracker'; import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; import type { ExecutionRequest } from '@/executions/execution.types'; import type { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; import type { Job } from '@/scaling/types'; +import { mockInstance } from '@test/mocking'; describe('ExecutionService', () => { - const scalingService = mock(); + const scalingService = mockInstance(ScalingService); const activeExecutions = mock(); const executionRepository = mock(); const waitTracker = mock(); @@ -23,7 +24,6 @@ describe('ExecutionService', () => { const executionService = new ExecutionService( mock(), mock(), - scalingService, activeExecutions, executionRepository, mock(), diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index 849d06c968c7e..b4e8b335739be 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -1,4 +1,4 @@ -import { Service } from 'typedi'; +import { Container, Service } from 'typedi'; import { GlobalConfig } from '@n8n/config'; import { validate as jsonSchemaValidate } from 'jsonschema'; import type { @@ -24,7 +24,6 @@ import type { IWorkflowExecutionDataProcess, } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; -import { ScalingService } from '@/scaling/scaling.service'; import type { ExecutionRequest, ExecutionSummaries, StopResult } from './execution.types'; import { WorkflowRunner } from '@/WorkflowRunner'; import type { IGetExecutionsQueryFilter } from '@db/repositories/execution.repository'; @@ -85,7 +84,6 @@ export class ExecutionService { constructor( private readonly globalConfig: GlobalConfig, private readonly logger: Logger, - private readonly scalingService: ScalingService, private readonly activeExecutions: ActiveExecutions, private readonly executionRepository: ExecutionRepository, private readonly workflowRepository: WorkflowRepository, @@ -471,12 +469,14 @@ export class ExecutionService { this.waitTracker.stopExecution(execution.id); } - const jobs = await this.scalingService.findJobsByStatus(['active', 'waiting']); + const { ScalingService } = await import('@/scaling/scaling.service'); + const scalingService = Container.get(ScalingService); + const jobs = await scalingService.findJobsByStatus(['active', 'waiting']); const job = jobs.find(({ data }) => data.executionId === execution.id); if (job) { - await this.scalingService.stopJob(job); + await scalingService.stopJob(job); } else { this.logger.debug('Job to stop not in queue', { executionId: execution.id }); } diff --git a/packages/cli/test/integration/execution.service.integration.test.ts b/packages/cli/test/integration/execution.service.integration.test.ts index 6f29950bf48d4..41dcec3ce60e6 100644 --- a/packages/cli/test/integration/execution.service.integration.test.ts +++ b/packages/cli/test/integration/execution.service.integration.test.ts @@ -22,7 +22,6 @@ describe('ExecutionService', () => { mock(), mock(), mock(), - mock(), executionRepository, Container.get(WorkflowRepository), mock(),