From b8e72c4377b759315c77a134d4791b073f2ff5fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Thu, 21 Dec 2023 17:37:08 +0100 Subject: [PATCH] fix(core): Remove circular dependency in WorkflowService and ActiveWorkflowRunner (#8128) A circular dependency between `WorkflowService` and `ActiveWorkflowRunner` is sometimes causing `this.activeWorkflowRunner` to be `undefined` in `WorkflowService`. Breaking this circular dependency should hopefully fix this issue. - [x] PR title and summary are descriptive - [ ] Tests included --- packages/cli/src/ActiveWorkflowRunner.ts | 23 ++++--- packages/cli/src/WebhookHelpers.ts | 4 -- .../cli/src/WorkflowExecuteAdditionalData.ts | 12 ++-- .../repositories/workflow.repository.ts | 27 +++++++- .../src/executions/executions.service.ee.ts | 4 +- .../cli/src/workflows/workflow.service.ee.ts | 4 +- .../cli/src/workflows/workflow.service.ts | 67 ++----------------- .../workflows/workflowStaticData.service.ts | 41 ++++++++++++ .../src/workflows/workflows.controller.ee.ts | 3 +- 9 files changed, 99 insertions(+), 86 deletions(-) create mode 100644 packages/cli/src/workflows/workflowStaticData.service.ts diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 5b4a15f40c39a..61be101f142a6 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -1,8 +1,7 @@ /* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ - -import { Container, Service } from 'typedi'; +import { Service } from 'typedi'; import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core'; import type { @@ -59,7 +58,6 @@ import { NodeTypes } from '@/NodeTypes'; import { WorkflowRunner } from '@/WorkflowRunner'; import { ExternalHooks } from '@/ExternalHooks'; import { whereClause } from './UserManagement/UserManagementHelper'; -import { WorkflowService } from './workflows/workflow.service'; import { WebhookNotFoundError } from './errors/response-errors/webhook-not-found.error'; import { In } from 'typeorm'; import { WebhookService } from './services/webhook.service'; @@ -70,6 +68,7 @@ import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee' import { ActivationErrorsService } from '@/ActivationErrors.service'; import type { Scope } from '@n8n/permissions'; import { NotFoundError } from './errors/response-errors/not-found.error'; +import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service'; @Service() export class ActiveWorkflowRunner implements IWebhookManager { @@ -94,6 +93,8 @@ export class ActiveWorkflowRunner implements IWebhookManager { private readonly sharedWorkflowRepository: SharedWorkflowRepository, private readonly multiMainSetup: MultiMainSetup, private readonly activationErrorsService: ActivationErrorsService, + private readonly executionService: ExecutionsService, + private readonly workflowStaticDataService: WorkflowStaticDataService, ) {} async init() { @@ -213,10 +214,12 @@ export class ActiveWorkflowRunner implements IWebhookManager { undefined, request, response, - (error: Error | null, data: object) => { + async (error: Error | null, data: object) => { if (error !== null) { return reject(error); } + // Save static data if it changed + await this.workflowStaticDataService.saveStaticData(workflow); resolve(data); }, ); @@ -412,7 +415,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { } await this.webhookService.populateCache(); - await Container.get(WorkflowService).saveStaticData(workflow); + await this.workflowStaticDataService.saveStaticData(workflow); } /** @@ -451,7 +454,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { await workflow.deleteWebhook(webhookData, NodeExecuteFunctions, mode, 'update'); } - await Container.get(WorkflowService).saveStaticData(workflow); + await this.workflowStaticDataService.saveStaticData(workflow); await this.webhookService.deleteWorkflowWebhooks(workflowId); } @@ -524,7 +527,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { donePromise?: IDeferredPromise, ): void => { this.logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`); - void Container.get(WorkflowService).saveStaticData(workflow); + void this.workflowStaticDataService.saveStaticData(workflow); const executePromise = this.runWorkflow( workflowData, node, @@ -579,7 +582,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { donePromise?: IDeferredPromise, ): void => { this.logger.debug(`Received trigger for workflow "${workflow.name}"`); - void Container.get(WorkflowService).saveStaticData(workflow); + void this.workflowStaticDataService.saveStaticData(workflow); const executePromise = this.runWorkflow( workflowData, @@ -814,7 +817,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { await this.activationErrorsService.unset(workflowId); const triggerCount = this.countTriggers(workflow, additionalData); - await Container.get(WorkflowService).updateWorkflowTriggerCount(workflow.id, triggerCount); + await this.workflowRepository.updateWorkflowTriggerCount(workflow.id, triggerCount); } catch (e) { const error = e instanceof Error ? e : new Error(`${e}`); await this.activationErrorsService.set(workflowId, error.message); @@ -824,7 +827,7 @@ export class ActiveWorkflowRunner implements IWebhookManager { // If for example webhooks get created it sometimes has to save the // id of them in the static data. So make sure that data gets persisted. - await Container.get(WorkflowService).saveStaticData(workflow); + await this.workflowStaticDataService.saveStaticData(workflow); } /** diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 0446d5a92b6fa..f218ec11c1b7a 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -60,7 +60,6 @@ import type { WorkflowEntity } from '@db/entities/WorkflowEntity'; import { EventsService } from '@/services/events.service'; import { OwnershipService } from './services/ownership.service'; import { parseBody } from './middlewares'; -import { WorkflowService } from './workflows/workflow.service'; import { Logger } from './Logger'; import { NotFoundError } from './errors/response-errors/not-found.error'; import { InternalServerError } from './errors/response-errors/internal-server.error'; @@ -386,9 +385,6 @@ export async function executeWebhook( }; } - // Save static data if it changed - await Container.get(WorkflowService).saveStaticData(workflow); - const additionalKeys: IWorkflowDataProxyAdditionalKeys = { $executionId: executionId, }; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 1f6bcc627b4e9..e81d97098273f 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -52,7 +52,6 @@ import * as WebhookHelpers from '@/WebhookHelpers'; import * as WorkflowHelpers from '@/WorkflowHelpers'; import { findSubworkflowStart, isWorkflowIdValid } from '@/utils'; import { PermissionChecker } from './UserManagement/PermissionChecker'; -import { WorkflowService } from './workflows/workflow.service'; import { InternalHooks } from '@/InternalHooks'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { EventsService } from '@/services/events.service'; @@ -67,6 +66,8 @@ import { restoreBinaryDataId } from './executionLifecycleHooks/restoreBinaryData import { toSaveSettings } from './executionLifecycleHooks/toSaveSettings'; import { Logger } from './Logger'; import { saveExecutionProgress } from './executionLifecycleHooks/saveExecutionProgress'; +import { WorkflowStaticDataService } from './workflows/workflowStaticData.service'; +import { WorkflowRepository } from './databases/repositories/workflow.repository'; const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType'); @@ -418,7 +419,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) { // Workflow is saved so update in database try { - await Container.get(WorkflowService).saveStaticDataById( + await Container.get(WorkflowStaticDataService).saveStaticDataById( this.workflowData.id as string, newStaticData, ); @@ -564,7 +565,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { if (isWorkflowIdValid(this.workflowData.id) && newStaticData) { // Workflow is saved so update in database try { - await Container.get(WorkflowService).saveStaticDataById( + await Container.get(WorkflowStaticDataService).saveStaticDataById( this.workflowData.id as string, newStaticData, ); @@ -714,7 +715,10 @@ export async function getWorkflowData( if (workflowInfo.id !== undefined) { const relations = config.getEnv('workflowTagsDisabled') ? [] : ['tags']; - workflowData = await Container.get(WorkflowService).get({ id: workflowInfo.id }, { relations }); + workflowData = await Container.get(WorkflowRepository).get( + { id: workflowInfo.id }, + { relations }, + ); if (workflowData === undefined || workflowData === null) { throw new ApplicationError('Workflow does not exist.', { diff --git a/packages/cli/src/databases/repositories/workflow.repository.ts b/packages/cli/src/databases/repositories/workflow.repository.ts index d5b193ff2678c..990b2b829cb45 100644 --- a/packages/cli/src/databases/repositories/workflow.repository.ts +++ b/packages/cli/src/databases/repositories/workflow.repository.ts @@ -1,5 +1,6 @@ import { Service } from 'typedi'; -import { DataSource, Repository } from 'typeorm'; +import { DataSource, Repository, type UpdateResult, type FindOptionsWhere } from 'typeorm'; +import config from '@/config'; import { WorkflowEntity } from '../entities/WorkflowEntity'; @Service() @@ -8,6 +9,13 @@ export class WorkflowRepository extends Repository { super(WorkflowEntity, dataSource.manager); } + async get(where: FindOptionsWhere, options?: { relations: string[] }) { + return this.findOne({ + where, + relations: options?.relations, + }); + } + async getAllActive() { return this.find({ where: { active: true }, @@ -28,4 +36,21 @@ export class WorkflowRepository extends Repository { }); return totalTriggerCount ?? 0; } + + async updateWorkflowTriggerCount(id: string, triggerCount: number): Promise { + const qb = this.createQueryBuilder('workflow'); + return qb + .update() + .set({ + triggerCount, + updatedAt: () => { + if (['mysqldb', 'mariadb'].includes(config.getEnv('database.type'))) { + return 'updatedAt'; + } + return '"updatedAt"'; + }, + }) + .where('id = :id', { id }) + .execute(); + } } diff --git a/packages/cli/src/executions/executions.service.ee.ts b/packages/cli/src/executions/executions.service.ee.ts index 9fbfa8ca71919..662ce3875948f 100644 --- a/packages/cli/src/executions/executions.service.ee.ts +++ b/packages/cli/src/executions/executions.service.ee.ts @@ -6,7 +6,7 @@ import type { IExecutionResponse, IExecutionFlattedResponse } from '@/Interfaces import { EnterpriseWorkflowService } from '../workflows/workflow.service.ee'; import type { WorkflowWithSharingsAndCredentials } from '@/workflows/workflows.types'; import Container from 'typedi'; -import { WorkflowService } from '@/workflows/workflow.service'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; export class EEExecutionsService extends ExecutionsService { /** @@ -26,7 +26,7 @@ export class EEExecutionsService extends ExecutionsService { const relations = ['shared', 'shared.user', 'shared.role']; - const workflow = (await Container.get(WorkflowService).get( + const workflow = (await Container.get(WorkflowRepository).get( { id: execution.workflowId }, { relations }, )) as WorkflowWithSharingsAndCredentials; diff --git a/packages/cli/src/workflows/workflow.service.ee.ts b/packages/cli/src/workflows/workflow.service.ee.ts index 57c2e88ea8c23..6431a3654c326 100644 --- a/packages/cli/src/workflows/workflow.service.ee.ts +++ b/packages/cli/src/workflows/workflow.service.ee.ts @@ -18,6 +18,7 @@ import type { CredentialsEntity } from '@db/entities/CredentialsEntity'; import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; import { BadRequestError } from '@/errors/response-errors/bad-request.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; @Service() export class EnterpriseWorkflowService { @@ -26,6 +27,7 @@ export class EnterpriseWorkflowService { private readonly userService: UserService, private readonly roleService: RoleService, private readonly sharedWorkflowRepository: SharedWorkflowRepository, + private readonly workflowRepository: WorkflowRepository, ) {} async isOwned( @@ -182,7 +184,7 @@ export class EnterpriseWorkflowService { } async preventTampering(workflow: WorkflowEntity, workflowId: string, user: User) { - const previousVersion = await this.workflowService.get({ id: workflowId }); + const previousVersion = await this.workflowRepository.get({ id: workflowId }); if (!previousVersion) { throw new NotFoundError('Workflow not found'); diff --git a/packages/cli/src/workflows/workflow.service.ts b/packages/cli/src/workflows/workflow.service.ts index d4854d123dff0..fb2380ccdbf7a 100644 --- a/packages/cli/src/workflows/workflow.service.ts +++ b/packages/cli/src/workflows/workflow.service.ts @@ -1,7 +1,7 @@ import Container, { Service } from 'typedi'; -import type { IDataObject, INode, IPinData } from 'n8n-workflow'; -import { NodeApiError, ErrorReporterProxy as ErrorReporter, Workflow } from 'n8n-workflow'; -import type { FindManyOptions, FindOptionsSelect, FindOptionsWhere, UpdateResult } from 'typeorm'; +import type { INode, IPinData } from 'n8n-workflow'; +import { NodeApiError, Workflow } from 'n8n-workflow'; +import type { FindManyOptions, FindOptionsSelect, FindOptionsWhere } from 'typeorm'; import { In, Like } from 'typeorm'; import pick from 'lodash/pick'; import omit from 'lodash/omit'; @@ -25,7 +25,7 @@ import { whereClause } from '@/UserManagement/UserManagementHelper'; import { InternalHooks } from '@/InternalHooks'; import { WorkflowRepository } from '@db/repositories/workflow.repository'; import { OwnershipService } from '@/services/ownership.service'; -import { isStringArray, isWorkflowIdValid } from '@/utils'; +import { isStringArray } from '@/utils'; import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee'; import { BinaryDataService } from 'n8n-core'; import type { Scope } from '@n8n/permissions'; @@ -120,13 +120,6 @@ export class WorkflowService { return pinnedTriggers.find((pt) => pt.name === checkNodeName) ?? null; // partial execution } - async get(workflow: FindOptionsWhere, options?: { relations: string[] }) { - return this.workflowRepository.findOne({ - where: workflow, - relations: options?.relations, - }); - } - async getMany(sharedWorkflowIds: string[], options?: ListQuery.Options) { if (sharedWorkflowIds.length === 0) return { workflows: [], count: 0 }; @@ -512,56 +505,4 @@ export class WorkflowService { return sharedWorkflow.workflow; } - - async updateWorkflowTriggerCount(id: string, triggerCount: number): Promise { - const qb = this.workflowRepository.createQueryBuilder('workflow'); - return qb - .update() - .set({ - triggerCount, - updatedAt: () => { - if (['mysqldb', 'mariadb'].includes(config.getEnv('database.type'))) { - return 'updatedAt'; - } - return '"updatedAt"'; - }, - }) - .where('id = :id', { id }) - .execute(); - } - - /** - * Saves the static data if it changed - */ - async saveStaticData(workflow: Workflow): Promise { - if (workflow.staticData.__dataChanged === true) { - // Static data of workflow changed and so has to be saved - if (isWorkflowIdValid(workflow.id)) { - // Workflow is saved so update in database - try { - await this.saveStaticDataById(workflow.id, workflow.staticData); - workflow.staticData.__dataChanged = false; - } catch (error) { - ErrorReporter.error(error); - this.logger.error( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - `There was a problem saving the workflow with id "${workflow.id}" to save changed Data: "${error.message}"`, - { workflowId: workflow.id }, - ); - } - } - } - } - - /** - * Saves the given static data on workflow - * - * @param {(string)} workflowId The id of the workflow to save data on - * @param {IDataObject} newStaticData The static data to save - */ - async saveStaticDataById(workflowId: string, newStaticData: IDataObject): Promise { - await this.workflowRepository.update(workflowId, { - staticData: newStaticData, - }); - } } diff --git a/packages/cli/src/workflows/workflowStaticData.service.ts b/packages/cli/src/workflows/workflowStaticData.service.ts new file mode 100644 index 0000000000000..b569c69a3002d --- /dev/null +++ b/packages/cli/src/workflows/workflowStaticData.service.ts @@ -0,0 +1,41 @@ +import { Service } from 'typedi'; +import { type IDataObject, type Workflow, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow'; +import { Logger } from '@/Logger'; +import { WorkflowRepository } from '@db/repositories/workflow.repository'; +import { isWorkflowIdValid } from '@/utils'; + +@Service() +export class WorkflowStaticDataService { + constructor( + private readonly logger: Logger, + private readonly workflowRepository: WorkflowRepository, + ) {} + + /** Saves the static data if it changed */ + async saveStaticData(workflow: Workflow): Promise { + if (workflow.staticData.__dataChanged === true) { + // Static data of workflow changed and so has to be saved + if (isWorkflowIdValid(workflow.id)) { + // Workflow is saved so update in database + try { + await this.saveStaticDataById(workflow.id, workflow.staticData); + workflow.staticData.__dataChanged = false; + } catch (error) { + ErrorReporter.error(error); + this.logger.error( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + `There was a problem saving the workflow with id "${workflow.id}" to save changed Data: "${error.message}"`, + { workflowId: workflow.id }, + ); + } + } + } + } + + /** Saves the given static data on workflow */ + async saveStaticDataById(workflowId: string, newStaticData: IDataObject): Promise { + await this.workflowRepository.update(workflowId, { + staticData: newStaticData, + }); + } +} diff --git a/packages/cli/src/workflows/workflows.controller.ee.ts b/packages/cli/src/workflows/workflows.controller.ee.ts index 900c8d3660249..27a3abc0cfff2 100644 --- a/packages/cli/src/workflows/workflows.controller.ee.ts +++ b/packages/cli/src/workflows/workflows.controller.ee.ts @@ -28,6 +28,7 @@ import { BadRequestError } from '@/errors/response-errors/bad-request.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; import { InternalServerError } from '@/errors/response-errors/internal-server.error'; import { WorkflowService } from './workflow.service'; +import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; export const EEWorkflowController = express.Router(); @@ -129,7 +130,7 @@ EEWorkflowController.get( relations.push('tags'); } - const workflow = await Container.get(WorkflowService).get({ id: workflowId }, { relations }); + const workflow = await Container.get(WorkflowRepository).get({ id: workflowId }, { relations }); if (!workflow) { throw new NotFoundError(`Workflow with ID "${workflowId}" does not exist`);