Skip to content

Commit

Permalink
fix(core): Do not load ScalingService in regular mode (no-changelog) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy authored Aug 8, 2024
1 parent eef4fb8 commit 1869c39
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 19 deletions.
17 changes: 9 additions & 8 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { ExternalHooks } from '@/ExternalHooks';
import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import type { Job, JobData, JobResult } from '@/scaling/types';
import { ScalingService } from '@/scaling/scaling.service';
import type { ScalingService } from '@/scaling/scaling.service';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
Expand All @@ -40,7 +40,7 @@ import { EventService } from './events/event.service';

@Service()
export class WorkflowRunner {
private readonly scalingService: ScalingService;
private scalingService: ScalingService;

private executionsMode = config.getEnv('executions.mode');

Expand All @@ -53,11 +53,7 @@ export class WorkflowRunner {
private readonly nodeTypes: NodeTypes,
private readonly permissionChecker: PermissionChecker,
private readonly eventService: EventService,
) {
if (this.executionsMode === 'queue') {
this.scalingService = Container.get(ScalingService);
}
}
) {}

/** The process did error */
async processError(
Expand Down Expand Up @@ -360,6 +356,11 @@ export class WorkflowRunner {
loadStaticData: !!loadStaticData,
};

if (!this.scalingService) {
const { ScalingService } = await import('@/scaling/scaling.service');
this.scalingService = Container.get(ScalingService);
}

let priority = 100;
if (realtime === true) {
// Jobs which require a direct response get a higher priority
Expand Down Expand Up @@ -404,7 +405,7 @@ export class WorkflowRunner {
async (resolve, reject, onCancel) => {
onCancel.shouldReject = false;
onCancel(async () => {
await Container.get(ScalingService).stopJob(job);
await this.scalingService.stopJob(job);

// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
// "workflowExecuteAfter" which we require.
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { ApplicationError } from 'n8n-workflow';

import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ScalingService } from '@/scaling/scaling.service';
import { WebhookServer } from '@/webhooks/WebhookServer';
import { BaseCommand } from './BaseCommand';

Expand Down Expand Up @@ -96,6 +95,7 @@ export class Webhook extends BaseCommand {
);
}

const { ScalingService } = await import('@/scaling/scaling.service');
await Container.get(ScalingService).setupQueue();
await this.server.start();
this.logger.debug(`Webhook listener ID: ${this.server.uniqueInstanceId}`);
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { sleep, ApplicationError } from 'n8n-workflow';
import * as Db from '@/Db';
import * as ResponseHelper from '@/ResponseHelper';
import config from '@/config';
import { ScalingService } from '@/scaling/scaling.service';
import type { ScalingService } from '@/scaling/scaling.service';
import { N8N_VERSION, inTest } from '@/constants';
import type { ICredentialsOverwrite } from '@/Interfaces';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
Expand Down Expand Up @@ -171,6 +171,7 @@ export class Worker extends BaseCommand {
}

async initScalingService() {
const { ScalingService } = await import('@/scaling/scaling.service');
this.scalingService = Container.get(ScalingService);

await this.scalingService.setupQueue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.err
import { MissingExecutionStopError } from '@/errors/missing-execution-stop.error';
import type { ActiveExecutions } from '@/ActiveExecutions';
import type { IExecutionResponse } from '@/Interfaces';
import type { ScalingService } from '@/scaling/scaling.service';
import { ScalingService } from '@/scaling/scaling.service';
import type { WaitTracker } from '@/WaitTracker';
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { ExecutionRequest } from '@/executions/execution.types';
import type { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
import type { Job } from '@/scaling/types';
import { mockInstance } from '@test/mocking';

describe('ExecutionService', () => {
const scalingService = mock<ScalingService>();
const scalingService = mockInstance(ScalingService);
const activeExecutions = mock<ActiveExecutions>();
const executionRepository = mock<ExecutionRepository>();
const waitTracker = mock<WaitTracker>();
Expand All @@ -23,7 +24,6 @@ describe('ExecutionService', () => {
const executionService = new ExecutionService(
mock(),
mock(),
scalingService,
activeExecutions,
executionRepository,
mock(),
Expand Down
10 changes: 5 additions & 5 deletions packages/cli/src/executions/execution.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Service } from 'typedi';
import { Container, Service } from 'typedi';
import { GlobalConfig } from '@n8n/config';
import { validate as jsonSchemaValidate } from 'jsonschema';
import type {
Expand All @@ -24,7 +24,6 @@ import type {
IWorkflowExecutionDataProcess,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import { ScalingService } from '@/scaling/scaling.service';
import type { ExecutionRequest, ExecutionSummaries, StopResult } from './execution.types';
import { WorkflowRunner } from '@/WorkflowRunner';
import type { IGetExecutionsQueryFilter } from '@db/repositories/execution.repository';
Expand Down Expand Up @@ -85,7 +84,6 @@ export class ExecutionService {
constructor(
private readonly globalConfig: GlobalConfig,
private readonly logger: Logger,
private readonly scalingService: ScalingService,
private readonly activeExecutions: ActiveExecutions,
private readonly executionRepository: ExecutionRepository,
private readonly workflowRepository: WorkflowRepository,
Expand Down Expand Up @@ -471,12 +469,14 @@ export class ExecutionService {
this.waitTracker.stopExecution(execution.id);
}

const jobs = await this.scalingService.findJobsByStatus(['active', 'waiting']);
const { ScalingService } = await import('@/scaling/scaling.service');
const scalingService = Container.get(ScalingService);
const jobs = await scalingService.findJobsByStatus(['active', 'waiting']);

const job = jobs.find(({ data }) => data.executionId === execution.id);

if (job) {
await this.scalingService.stopJob(job);
await scalingService.stopJob(job);
} else {
this.logger.debug('Job to stop not in queue', { executionId: execution.id });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ describe('ExecutionService', () => {
mock(),
mock(),
mock(),
mock(),
executionRepository,
Container.get(WorkflowRepository),
mock(),
Expand Down

0 comments on commit 1869c39

Please sign in to comment.